在锁中使用多个条件

锁可以和一个或多个条件相关联。在 Condition 接口中声明这些条件。 这些条件的目的是允许线程拥有对锁的控制并检查条件是否满足。如果不满足则挂起直到另一个线程唤醒它们。 Condition 接口提供了挂起线程和唤醒被挂起线程的机制。

任务

生产者从一个模拟物理文件读取数据放入缓冲区,多个消费者从缓冲区读出数据。

实现

  • (模拟)物理文件 : FileMock

    • 属性 String content[] : 文件内容, 行数组
    • 属性 int index : 行数组的当前行号
    • 构造函数 : 填充 size 行长度为 length 的随机字符串到行数组中
        private String content[];
        private int index;
    
        public FileMock(int size, int length) {
            content = new String[size];
            for (int i = 0; i < size; i++) {
                StringBuilder buffer = new StringBuilder(length);
                for (int j = 0; j < length; j++) {
                    int indice = (int) (Math.random() * 255);
                    buffer.append((char) indice);
                }
                content[i] = buffer.toString();
            }
            index = 0;
        }
    
    • hasMoreLines() : 判断文件中是否还有内容可供处理
        public boolean hasMoreLines() {
            return index < content.length;
        }
    
    • getLine() : 返回当前行并让行号加1。如果文件中没有内容供处理则返回 null
        public String getLine() {
            if (this.hasMoreLines()) {
                System.out.println("Mock: " + (content.length - index));
                return content[index++];
            }
            return null;
        }
    
  • 缓冲区 : Buffer

    • 属性 LinkedList<String> buffer : 存储共享数据
    • 属性 int maxSize : 缓冲区最大长度
    • 属性 boolean pendingLines : 标识是否还有内容可读入缓冲区
    • 属性 ReentrantLock lock : 控制对修改缓冲区的代码块的访问权限
    • 属性 Condition lines : 判断是否有数据
    • 属性 Condition space : 判断是否有空间
    • 构造函数初始化上述属性
        private LinkedList<String> buffer;
        private int maxSize;
        private boolean pendingLines;
    
        private ReentrantLock lock;
        private Condition lines;
        private Condition space;
    
        public Buffer(int maxSize) {
            this.maxSize = maxSize;
            buffer = new LinkedList<>();
            lock = new ReentrantLock();
            lines = lock.newCondition();
            space = lock.newCondition();
            pendingLines = true;
        }
    
    • insert() 方法 : 将String类型参数line的内容存入缓冲区
      1. 获得对锁的控制
      2. 检查缓冲区内是否有空闲空间
        • 如果缓冲区已满,则调用 space 条件的 await() 方法等待有空闲空间时被唤醒 当其他线程调用 space 条件的 signal() 或 signalAll() 方法时,(线程)将会被唤醒
      3. 有空闲空间或线程被唤醒时,将参数 line 的内容存入缓冲区并调用 lines 条件的 signalAll 方法 (将唤醒在等待缓冲区内有内容的所有线程)
      4. 最后别忘了释放锁
        public void insert(String line) {
            lock.lock();
            try {
                while (buffer.size() == maxSize) {
                    space.await();
                }
                buffer.offer(line);
                System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread().getName(), buffer.size());
                lines.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    • get() 方法 : 返回保存在缓冲区内数据的第一行
      1. 获得对锁的控制
      2. 检查缓冲区内是否有数据
        • 如果缓冲区是空的,调用 lines 条件的 await() 方法等待缓冲区内有数据时被唤醒。 当其他线程调用 lines 条件上的 signal() 或 signalAll() 方法时,此线程将会被唤醒
      3. 当缓冲区内有数据或线程被唤醒时,从缓冲区内获取第一行,调用 space 条件上的 signalAll() 方法
      4. 最后要释放对锁的控制
        public String get() {
            String line = null;
            lock.lock();
            try {
                while ((buffer.size() == 0) && (hasPendingLines())) {
                    lines.await();
                }
                if (hasPendingLines()) {
                    line = buffer.poll();
                    System.out.printf("%s: Line Readed: %d\n", Thread.currentThread().getName(), buffer.size());
                    space.signalAll();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            return line;
        }
    
    • setPendingLines : 供 Producer 调用以设置是否还有内容可供读入缓冲区
    • hasPendingLines : 是否还有内容可获取(可从文件读入或缓冲区自身还有内容)
        public void setPendingLines(boolean pendingLines) {
            this.pendingLines = pendingLines;
        }
    
        public boolean hasPendingLines() {
            return pendingLines || buffer.size() > 0;
        }
    
  • 生产者线程类 : Producer

    • 属性 FileMock mock, Buffer buffer 及构造函数
        private FileMock mock;
        private Buffer buffer;
    
        public Producer(FileMock mock, Buffer buffer) {
            this.mock = mock;
            this.buffer = buffer;
        }
    
    • run 方法 : 读取 FileMock 对象内的所有行使用缓冲区的 insert() 方法保存到缓冲区。 一结束就使用缓冲区的 setPendingLines() 方法告知缓冲区没有更多内容可供写入缓冲区了。
            buffer.setPendingLines(true);
            while (mock.hasMoreLines()) {
                String line = mock.getLine();
                buffer.insert(line);
            }
            buffer.setPendingLines(false);
    
  • 消费者线程类 : Consumer

    • 属性 Buffer buffer 及构造函数
        private Buffer buffer;
    
        public Consumer(Buffer buffer) {
            this.buffer = buffer;
        }
    
    • run 方法 : 当缓冲区还有内容可读,则尝试读取一行并处理
            while (buffer.hasPendingLines()) {
                String line = buffer.get();
                processLine(line);
            }
    
    • 辅助方法 processLine : 睡上 10 毫秒模拟进行某种处理
        private void processLine(String line) {
            try {
                Random random = new Random();
                Thread.sleep(random.nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
  • 控制类 : Main

    • 创建一个 FileMock 对象和一个 Buffer 对象
        FileMock mock = new FileMock(100, 10);
        Buffer buffer = new Buffer(20);
    
    • 创建一个生产者,多个消费者及对应的线程
        Producer producer = new Producer(mock, buffer);
        Thread threadProducer = new Thread(producer, "Producer");
    
        Consumer consumers[] = new Consumer[3];
        Thread threadConsumers[] = new Thread[3];
    
        for (int i = 0; i < 3; i++) {
            consumers[i] = new Consumer(buffer);
            threadConsumers[i] = new Thread(consumers[i], "Consumer " + i);
        }
    
    • 启动生产者和消费者线程
        threadProducer.start();
        for (int i = 0; i < 3; i++) {
            threadConsumers[i].start();
        }
    

工作原理

所有的条件对象都和一个锁关联,使用 Lock 接口的 newCondition() 方法创建。 在能用条件做任何操作前,必须要拥有对此条件关联的锁的控制。 所以必须在调用一个锁对象的lock()方法开始、调用同一锁对象的unlock()方法结束的代码块内使用条件操作。

当一个线程调用一个条件的 await() 方法,它自动释放对锁的控制,这样另一个线程能获得锁并开始执行同样的操作或另一个由该锁保护的临界区。

当一个线程调用一个条件的 signal() 或 signalAll() 方法,在等待该条件的一个或所有线程都被唤醒,但是并不保证能满足让它们睡眠的条件已满足, 所以必须将 await() 调用放入一个 while 循环,直到条件满足了才离开循环。当条件不满足时,必须再次调用 await()。

您必须小心使用 await() 和 signal()。如果您调用一个条件的 await() 方法但不调用此条件的 signal()/signalAll() 方法,线程将永久睡眠。

线程可在调用 await() 方法后睡眠时被中断,所以您必须处理 InterruptedException 中断。

了解更多

Condition 接口还有其他版本的 await() 方法,如下:

  • await(long time, TimeUnit unit): 线程将睡眠直到:
    • 被中断
    • 另一个线程调用此条件内的 signal() 或 signalAll() 方法
    • 指定的时间之后
  • awaitUninterruptibly(): 线程将睡眠直到另一个线程调用 signal() 或 signalAll() 方法,不会被中断
  • awaitUntil(Date date): 线程将睡眠直到:
    • 被中断
    • 另一个线程调用此条件内的 signal() 或 signalAll() 方法
    • 到达指定日期

条件也可和读写锁的 ReadLock 和 WriteLock 一起使用。