实现一个基于优先级的传输队列

Java 7 API 提供了几个数据结构用于并发应用。这里面,我们想强调以下2个数据结构:

  • LinkedTransferQueue: 建议把此数据结构用于那些有生产者/消费者结构的程序。
    在那些程序中,有一个或多个数据生产者和一个或多个数据消费者以及一个它们一起共享的数据结构。
    • 生产者将数据放入数据结构,消费者从数据结构中拿取数据。
    • 如果数据结构是空的,消费者被阻塞直到有数据可消费;
    • 如果数据结构是满的,生产者被阻塞直到有空间可放入数据
  • PriorityBlockingQueue:在这个数据结构中,元素是按顺序存储的。
    • 元素必须实现有 compareTo() 方法的 Comparable 接口
    • 当插入元素到结构时,与结构中的元素比较,直到找到它的位置

LinkedTransferQueue 的元素是按抵达顺序存储的,所以越早到的越先被消耗。 有可能您想要开发的生产者/消费者程序,数据的消耗顺序是由优先级决定的而不是抵达时间。

在本节中,将学习如何实现一个数据结构用于生产者/消费者问题,此结构的元素将按它们的优先级排序。优先级高的元素会先被消耗。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt7.sect10 package中

  • MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E> implements TransferQueue<E> :

    • 属性 counter : AtomicInteger 对象 : 等待中的消费者数量
    • 属性 toTransfer : LinkedBlockingQueue<E> : 待消耗的元素队列
    • 属性 lock : ReentrantLock 对象
    • 在构造函数中初始化属性
            private AtomicInteger counter;
            private LinkedBlockingQueue<E> toTransfer;
            private ReentrantLock lock;
    
            public MyPriorityTransferQueue() {
                counter = new AtomicInteger(0);
                lock = new ReentrantLock();
                toTransfer = new LinkedBlockingQueue<>();
            }
    
    • tryTransfer() 方法 :
      • 如果有消费者在等待,立即将元素送给等待中的消费者。
      • 否则返回 false
            @Override
            public boolean tryTransfer(E e) {
                lock.lock();
                boolean value;
                if (counter.get() == 0) {
                    value = false;
                } else {
                    put(e);
                    value = true;
                }
                lock.unlock();
                return value;
            }
    
    • tryTransfer(E e, long timeout, TimeUnit unit) 方法
      • 如果有消费者在等待,立即将元素送给等待中的消费者
      • 否则将指定时间转换成毫秒值,使用 wait() 方法让线程睡眠指定时长
        • 您等会将看到,当消费者拿走元素,如果有线程在 wait() 方法中睡眠,则将用 notify() 方法唤醒它
            @Override
            public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
                lock.lock();
                if (counter.get() != 0) {
                    put(e);
                    lock.unlock();
                    return true;
                } else {
                    toTransfer.add(e);
                    long newTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
                    lock.unlock();
    
                    e.wait(newTimeout);
    
                    lock.lock();
                    if (toTransfer.contains(e)) {
                        toTransfer.remove(e);
                        lock.unlock();
                        return false;
                    } else {
                        lock.unlock();
                        return true;
                    }
                }
            }
    
    • transfer() 方法:如果可能,尝试立即将元素送给一个等待中的消费者
      • 如果尚未有消费者在等待,则:
        • 将元素储存到一个特定的队列以发送给第一个尝试获取元素的消费者
        • 阻塞线程直到元素被消耗
            @Override
            public void transfer(E e) throws InterruptedException {
                lock.lock();
                if (counter.get() != 0) {
                    put(e);
                    lock.unlock();
                } else {
                    toTransfer.add(e);
                    lock.unlock();
                    synchronized (e) {
                        e.wait();
                    }
                }
            }
    
    • hasWaitingConsumer() 方法: 使用 counter 属性值来计算方法的返回值
      • 如果 counter 值大于0,返回true
      • 否则返回 false
            @Override
            public boolean hasWaitingConsumer() {
                return (counter.get() != 0);
            }
    
    • getWaitingConsumerCount() 方法:返回 counter 属性值
            @Override
            public int getWaitingConsumerCount() {
                return counter.get();
            }
    
    • take() 方法:消费者要消耗一个元素时调用此方法。
      1. 获得锁并增加等待中的消费者数目
      2. 调用 poll() 方法从 toTransfer 队列中获取元素
        • 如果没有元素,则:
          1. 释放锁;
          2. 尝试用父类的take()方法从自身队列中获取元素;(如果自身队列中没有元素,此方法将让线程睡到有元素可被消耗)
          3. 再次获取锁
        • 否则将此元素从 toTransfer 队列中拿走,并唤醒在等待消耗该元素的线程(如果有的话)
      3. 最后减少等待中的消费者数目并释放锁
            @Override
            public E take() throws InterruptedException {
                lock.lock();
                counter.incrementAndGet();
                E value = toTransfer.poll();
                if (value == null) {
                    lock.unlock();
                    value = super.take();
                    lock.lock();
                } else {
                    synchronized (value) {
                        value.notify();
                    }
                }
                counter.decrementAndGet();
                lock.unlock();
    
                return value;
            }
    
  • Event 类 : implements Comparable<Event>

    • 属性 thread : String 对象 : 创建此事件的线程名称
    • 属性 priority : int 类型 : 优先级
    • 在构造函数中初始化属性值为参数值
    • 属性的 getter 方法
            private String thread;
            private int priority;
    
            public Event(String thread, int priority) {
                this.thread = thread;
                this.priority = priority;
            }
    
            public String getThread() {
                return thread;
            }
    
            public int getPriority() {
                return priority;
            }
    
    • compareTo() 方法 :比较实际 Event 和参数 Event。
      • -1 :实际 Event 的 priority 属性值大于参数的
      • 1 :小于
      • 0 :等于
      • 以上逻辑将实现一个倒序的列表。高优先级的事件将保存在队列的前面。
            @Override
            public int compareTo(Event e) {
                if (this.priority > e.getPriority()) {
                    return -1;
                } else if (this.priority < e.getPriority()) {
                    return 1;
                } else {
                    return 0;
                }
            }
    
  • 生产者 : Producer : implements Runnable

    • 属性 buffer : MyPriorityTransferQueue<Event> 对象: 保存此生产者产生的事件
    • 构造函数
            private MyPriorityTransferQueue<Event> buffer;
    
            public Producer(MyPriorityTransferQueue<Event> buffer) {
                this.buffer = buffer;
            }
    
    • run() 方法: 创建 100 个 Event 对象,使用创建顺序作为优先级(最后创建的事件将有最高优先级)并用 put() 方法插入队列
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) {
                    Event event = new Event(Thread.currentThread().getName(), i);
                    buffer.put(event);
                }
            }
    
  • 消费者 : Consumer : implements Runnable

    • 属性 buffer : MyPriorityTransferQueue<Event> 对象: 从 buffer 中获取被消耗的事件
    • 构造函数
            private MyPriorityTransferQueue<Event> buffer;
    
            public Consumer(MyPriorityTransferQueue<Event> buffer) {
                this.buffer = buffer;
            }
    
    • run() 方法 : 使用 take() 方法消耗 1002 个事件(例子中生成的所有事件),并将生成事件的线程数和优先级信息输出到终端
            @Override
            public void run() {
                for (int i = 0; i < 1002; i++) {
                    try {
                        Event value = buffer.take();
                        System.out.printf("Consumer: %s: %d\n", value.getThread(), value.getPriority());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
  • 控制类 : Main

    1. 创建一个 MyPriorityTransferQueue<Event> 对象 buffer
    2. 创建 10 个 Producer 对象及对应的线程,并启动线程
    3. 创建 1 个 Consumer 对象及对应的线程,并启动线程
    4. 输出当前等待中的消费者数量
    5. 用 transfer() 方法将一个事件传输给消费者
    6. 使用 Thread 的 join() 方法等待生产者们结束运行
    7. 休息 1 秒
    8. 用 transfer() 方法将另一个事件传输给消费者
    9. 等待消费者线程结束后,输出程序结束的信息
            public static void main(String[] args) throws Exception {
                MyPriorityTransferQueue<Event> buffer = new MyPriorityTransferQueue<Event>();
    
                Producer producer = new Producer(buffer);
                Thread producerThreads[] = new Thread[10];
                for (int i = 0; i < producerThreads.length; i++) {
                    producerThreads[i] = new Thread(producer);
                    producerThreads[i].start();
                }
    
                Consumer consumer = new Consumer(buffer);
                Thread consumerThread = new Thread(consumer);
                consumerThread.start();
    
                System.out.printf("Main: Buffer: Consumer count: %d\n", buffer.getWaitingConsumerCount());
    
                Event myEvent = new Event("Core Event", 0);
                buffer.transfer(myEvent);
                System.out.printf("Main: My Event has ben transfered.\n");
    
                for (Thread producerThread : producerThreads) {
                    try {
                        producerThread.join();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                TimeUnit.SECONDS.sleep(1);
                System.out.printf("Main: Buffer: Consumer count: %d\n", buffer.getWaitingConsumerCount());
    
                myEvent = new Event("Core Event 2", 0);
                buffer.transfer(myEvent);
    
                consumerThread.join();
                System.out.printf("Main: End of the program\n");
    
            }
    

讲解

在本节中,实现了 MyPriorityTransferQueue 数据结构。它被用于生产者/消费者问题,而且它的元素是按优先级排序,而非按到达顺序。 因为 Java 不运行多继承,所有第一件要决定的事情是 MyPriorityTransferQueue 类的基类。 本例扩展了 PriorityBlockingQueue 类,以实现按优先级插入元素到结构的操作。也实现了 TransferQueue 接口以增加与生产者/消费者有关的方法。

MyPriorityTransferQueue 类有以下3个属性:

  • AtomicInteger 对象 counter: 此属性保存正等待从数据结构拿一个元素的消费者数量。
    • 当一个消费者调用 take() 操作来从数据结构获取一个元素,此计数器加1.
    • 当一个消费者结束 take() 操作,计数器减1
    • 此计数器被用于 hasWaitingConsumer() 和 getWaitingConsumerCount() 方法的实现中。
  • ReentrantLock 对象 lock: 此属性被用来控制对实现的操作的访问。同时只有一个线程能操作此数据结构。
  • LinkedBlockingQueue 对象 list: 保存被传输的元素

在 MyPriorityTransferQueue 类中实现了一些方法。 所有在 TransferQueue 接口中声明的方法和在 PriorityBlockingQueue 类中实现的 take() 方法。 有2个方法在前面描述了,下面是剩下的:

  • tryTransfer(E e): 尝试将一个元素直接发送给一个消费者。
    • 如果有一个消费者在等待,此方法将元素存入优先级队列以被消费者立即消耗,然后返回 true。
    • 如果没有消费者在等待,此方法返回 false
  • tryTransfer(E e, long timeout, TimeUnit unit): 类似 tryTransfer 方法
    • 在没有消费者等待时,阻塞线程,阻塞时长由参数决定。
    • 当线程在睡眠,必须先释放锁,否则会阻塞队列
  • transfer(E e):将一个元素直接传给一个消费者。
    • 如果有一个消费者在等待,此方法将元素存入优先级队列以被消费者立即消耗;
    • 否则元素被存入待传输元素列表,线程被阻塞直到元素被消耗
    • 线程进入睡眠时,必须释放锁;否则会阻塞队列。
  • take(): 此方法返回将被消耗的下一个元素
    • 如果在待传输元素列表中有元素,则待消耗元素从此列表中获取
    • 否则从优先级队列中获取



实现了数据结构后,实现了 Event 类。它是存储在数据结构里的元素的类。 Event 类有2个属性存储 producer 的 ID 和事件的优先级,并因为数据结构的需要实现了 Comparable 接口。

然后实现了 Producer 和 Consumer 类。在本例中,有 10 个生产者和 1 个消费者,它们共享同一个缓冲区。 每个生产者生成 100 个优先级递增的事件,这样最高优先级的事件是最后生成的。

例子的主类创建一个 MyPriorityTransferQueue 对象,10 个生产者和一个消费者, 并用 MyPriorityTransferQueue 缓冲区的 transfer() 方法传输2个事件到缓冲区。