锁可以和一个或多个条件相关联。在 Condition 接口中声明这些条件。 这些条件的目的是允许线程拥有对锁的控制并检查条件是否满足。如果不满足则挂起直到另一个线程唤醒它们。 Condition 接口提供了挂起线程和唤醒被挂起线程的机制。
(模拟)物理文件 : FileMock
private String content[]; private int index; public FileMock(int size, int length) { content = new String[size]; for (int i = 0; i < size; i++) { StringBuilder buffer = new StringBuilder(length); for (int j = 0; j < length; j++) { int indice = (int) (Math.random() * 255); buffer.append((char) indice); } content[i] = buffer.toString(); } index = 0; }
public boolean hasMoreLines() { return index < content.length; }
public String getLine() { if (this.hasMoreLines()) { System.out.println("Mock: " + (content.length - index)); return content[index++]; } return null; }
缓冲区 : Buffer
private LinkedList<String> buffer; private int maxSize; private boolean pendingLines; private ReentrantLock lock; private Condition lines; private Condition space; public Buffer(int maxSize) { this.maxSize = maxSize; buffer = new LinkedList<>(); lock = new ReentrantLock(); lines = lock.newCondition(); space = lock.newCondition(); pendingLines = true; }
public void insert(String line) { lock.lock(); try { while (buffer.size() == maxSize) { space.await(); } buffer.offer(line); System.out.printf("%s: Inserted Line: %d\n", Thread.currentThread().getName(), buffer.size()); lines.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public String get() { String line = null; lock.lock(); try { while ((buffer.size() == 0) && (hasPendingLines())) { lines.await(); } if (hasPendingLines()) { line = buffer.poll(); System.out.printf("%s: Line Readed: %d\n", Thread.currentThread().getName(), buffer.size()); space.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return line; }
public void setPendingLines(boolean pendingLines) { this.pendingLines = pendingLines; } public boolean hasPendingLines() { return pendingLines || buffer.size() > 0; }
生产者线程类 : Producer
private FileMock mock; private Buffer buffer; public Producer(FileMock mock, Buffer buffer) { this.mock = mock; this.buffer = buffer; }
buffer.setPendingLines(true); while (mock.hasMoreLines()) { String line = mock.getLine(); buffer.insert(line); } buffer.setPendingLines(false);
消费者线程类 : Consumer
private Buffer buffer; public Consumer(Buffer buffer) { this.buffer = buffer; }
while (buffer.hasPendingLines()) { String line = buffer.get(); processLine(line); }
private void processLine(String line) { try { Random random = new Random(); Thread.sleep(random.nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } }
控制类 : Main
FileMock mock = new FileMock(100, 10); Buffer buffer = new Buffer(20);
Producer producer = new Producer(mock, buffer); Thread threadProducer = new Thread(producer, "Producer"); Consumer consumers[] = new Consumer[3]; Thread threadConsumers[] = new Thread[3]; for (int i = 0; i < 3; i++) { consumers[i] = new Consumer(buffer); threadConsumers[i] = new Thread(consumers[i], "Consumer " + i); }
threadProducer.start(); for (int i = 0; i < 3; i++) { threadConsumers[i].start(); }
所有的条件对象都和一个锁关联,使用 Lock 接口的 newCondition() 方法创建。 在能用条件做任何操作前,必须要拥有对此条件关联的锁的控制。 所以必须在调用一个锁对象的lock()方法开始、调用同一锁对象的unlock()方法结束的代码块内使用条件操作。
当一个线程调用一个条件的 await() 方法,它自动释放对锁的控制,这样另一个线程能获得锁并开始执行同样的操作或另一个由该锁保护的临界区。
当一个线程调用一个条件的 signal() 或 signalAll() 方法,在等待该条件的一个或所有线程都被唤醒,但是并不保证能满足让它们睡眠的条件已满足, 所以必须将 await() 调用放入一个 while 循环,直到条件满足了才离开循环。当条件不满足时,必须再次调用 await()。
您必须小心使用 await() 和 signal()。如果您调用一个条件的 await() 方法但不调用此条件的 signal()/signalAll() 方法,线程将永久睡眠。
线程可在调用 await() 方法后睡眠时被中断,所以您必须处理 InterruptedException 中断。
Condition 接口还有其他版本的 await() 方法,如下:
条件也可和读写锁的 ReadLock 和 WriteLock 一起使用。