Java 7 API 提供了几个数据结构用于并发应用。这里面,我们想强调以下2个数据结构:
LinkedTransferQueue 的元素是按抵达顺序存储的,所以越早到的越先被消耗。 有可能您想要开发的生产者/消费者程序,数据的消耗顺序是由优先级决定的而不是抵达时间。
在本节中,将学习如何实现一个数据结构用于生产者/消费者问题,此结构的元素将按它们的优先级排序。优先级高的元素会先被消耗。
本节的示例代码在 com.elanzone.books.noteeg.chpt7.sect10 package中
MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> :
private AtomicInteger counter; private LinkedBlockingQueue<E> toTransfer; private ReentrantLock lock; public MyPriorityTransferQueue() { counter = new AtomicInteger(0); lock = new ReentrantLock(); toTransfer = new LinkedBlockingQueue<>(); }
@Override public boolean tryTransfer(E e) { lock.lock(); boolean value; if (counter.get() == 0) { value = false; } else { put(e); value = true; } lock.unlock(); return value; }
@Override public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); if (counter.get() != 0) { put(e); lock.unlock(); return true; } else { toTransfer.add(e); long newTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); lock.unlock(); e.wait(newTimeout); lock.lock(); if (toTransfer.contains(e)) { toTransfer.remove(e); lock.unlock(); return false; } else { lock.unlock(); return true; } } }
@Override public void transfer(E e) throws InterruptedException { lock.lock(); if (counter.get() != 0) { put(e); lock.unlock(); } else { toTransfer.add(e); lock.unlock(); synchronized (e) { e.wait(); } } }
@Override public boolean hasWaitingConsumer() { return (counter.get() != 0); }
@Override public int getWaitingConsumerCount() { return counter.get(); }
@Override public E take() throws InterruptedException { lock.lock(); counter.incrementAndGet(); E value = toTransfer.poll(); if (value == null) { lock.unlock(); value = super.take(); lock.lock(); } else { synchronized (value) { value.notify(); } } counter.decrementAndGet(); lock.unlock(); return value; }
Event 类 : implements Comparable<Event>
private String thread; private int priority; public Event(String thread, int priority) { this.thread = thread; this.priority = priority; } public String getThread() { return thread; } public int getPriority() { return priority; }
@Override public int compareTo(Event e) { if (this.priority > e.getPriority()) { return -1; } else if (this.priority < e.getPriority()) { return 1; } else { return 0; } }
生产者 : Producer : implements Runnable
private MyPriorityTransferQueue<Event> buffer; public Producer(MyPriorityTransferQueue<Event> buffer) { this.buffer = buffer; }
@Override public void run() { for (int i = 0; i < 100; i++) { Event event = new Event(Thread.currentThread().getName(), i); buffer.put(event); } }
消费者 : Consumer : implements Runnable
private MyPriorityTransferQueue<Event> buffer; public Consumer(MyPriorityTransferQueue<Event> buffer) { this.buffer = buffer; }
@Override public void run() { for (int i = 0; i < 1002; i++) { try { Event value = buffer.take(); System.out.printf("Consumer: %s: %d\n", value.getThread(), value.getPriority()); } catch (InterruptedException e) { e.printStackTrace(); } } }
控制类 : Main
public static void main(String[] args) throws Exception { MyPriorityTransferQueue<Event> buffer = new MyPriorityTransferQueue<Event>(); Producer producer = new Producer(buffer); Thread producerThreads[] = new Thread[10]; for (int i = 0; i < producerThreads.length; i++) { producerThreads[i] = new Thread(producer); producerThreads[i].start(); } Consumer consumer = new Consumer(buffer); Thread consumerThread = new Thread(consumer); consumerThread.start(); System.out.printf("Main: Buffer: Consumer count: %d\n", buffer.getWaitingConsumerCount()); Event myEvent = new Event("Core Event", 0); buffer.transfer(myEvent); System.out.printf("Main: My Event has ben transfered.\n"); for (Thread producerThread : producerThreads) { try { producerThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } TimeUnit.SECONDS.sleep(1); System.out.printf("Main: Buffer: Consumer count: %d\n", buffer.getWaitingConsumerCount()); myEvent = new Event("Core Event 2", 0); buffer.transfer(myEvent); consumerThread.join(); System.out.printf("Main: End of the program\n"); }
在本节中,实现了 MyPriorityTransferQueue 数据结构。它被用于生产者/消费者问题,而且它的元素是按优先级排序,而非按到达顺序。 因为 Java 不运行多继承,所有第一件要决定的事情是 MyPriorityTransferQueue 类的基类。 本例扩展了 PriorityBlockingQueue 类,以实现按优先级插入元素到结构的操作。也实现了 TransferQueue 接口以增加与生产者/消费者有关的方法。
MyPriorityTransferQueue 类有以下3个属性:
在 MyPriorityTransferQueue 类中实现了一些方法。 所有在 TransferQueue 接口中声明的方法和在 PriorityBlockingQueue 类中实现的 take() 方法。 有2个方法在前面描述了,下面是剩下的:
实现了数据结构后,实现了 Event 类。它是存储在数据结构里的元素的类。 Event 类有2个属性存储 producer 的 ID 和事件的优先级,并因为数据结构的需要实现了 Comparable 接口。
然后实现了 Producer 和 Consumer 类。在本例中,有 10 个生产者和 1 个消费者,它们共享同一个缓冲区。 每个生产者生成 100 个优先级递增的事件,这样最高优先级的事件是最后生成的。
例子的主类创建一个 MyPriorityTransferQueue 对象,10 个生产者和一个消费者, 并用 MyPriorityTransferQueue 缓冲区的 transfer() 方法传输2个事件到缓冲区。