使用按优先级排序的阻塞式线程安全列表

使用数据结果的一个典型需求是排序列表。Java 为此提供了 PriorityBlockingQueue 。

您要加到 PriorityBlockingQueue 的所有元素必须实现 Comparable 接口。此接口有一个方法 compareTo() 。 此方法接受一个相同类型的对象作为参数。所以有2个对象进行比较,一个是执行此方法的对象(本地对象),一个是参数。 此方法的返回值:

  • < 0: 本地对象比参数小
  • > 0: 本地对象比参数大
  • 0 : 2个对象相等

当插入一个元素到 PriorityBlockingQueue 时,PriorityBlockingQueue 用 compareTo() 方法来决定插入元素的位置。 较大的元素将被放在队列的尾部。

PriorityBlockingQueue 的另一个重要特点是它是一个阻塞式数据结构。它有些方法在不能立即执行操作时,阻塞线程直到能执行时。

任务

使用 PriorityBlockingQueue 保存许多有不同优先级的事件到同一个列表,以检查 PriorityBlockingQueue 是否如预期的排好序。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt6.sect04 package中

  • 数据类 : Event : 实现 Comparable<Event> 接口

    • 属性 thread : int : 记录是由哪个线程创建的此事件
    • 属性 priority : int : 优先级
    • 构造函数初始化属性为参数值
        private int thread;
        private int priority;
    
        public Event(int thread, int priority) {
            this.thread = thread;
            this.priority = priority;
        }
    
    • 属性的 getter 方法
        public int getThread() {
            return thread;
        }
    
        public int getPriority() {
            return priority;
        }
    
    • compareTo 方法:倒序比较
        @Override
        public int compareTo(Event e) {
            if (this.priority > e.getPriority()) {
                return -1;
            } else if (this.priority < e.getPriority()) {
                return 1;
            } else {
                return 0;
            }
        }
    
  • Runnable 实现类: Task

    • 属性 id : int : 此任务的标识号
    • 属性 queue : PriorityBlockingQueue<Event> 对象 :保存任务生成的事件
    • 构造函数初始化属性为参数值
        private int id;
        private PriorityBlockingQueue<Event> queue;
    
        public Task(int id, PriorityBlockingQueue<Event> queue) {
            this.id = id;
            this.queue = queue;
        }
    
    • run() 方法 :插入1000个事件到队列 (优先级逐个变大)
      • 注意 Event 的 compareTo 是倒序排序的,期望的最终顺序和插入顺序是不一致的
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                Event event = new Event(id, i);
                queue.add(event);
            }
        }
    
  • 控制类 :Main

    1. 创建一个 PriorityBlockingQueue 对象 queue,泛型参数为 Event 类
        PriorityBlockingQueue<Event> queue = new PriorityBlockingQueue<>();
    
    1. 创建几个 Task 对象及对应的线程
        Thread taskThreads[] = new Thread[5];
    
        for (int i = 0; i < taskThreads.length; i++) {
            Task task = new Task(i, queue);
            taskThreads[i] = new Thread(task);
        }
    
    1. 启动线程并等待线程结束
        for (Thread taskThread : taskThreads) {
            taskThread.start();
        }
    
        for (Thread taskThread : taskThreads) {
            try {
                taskThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    1. 输出队列大小信息
    2. 调用 poll() 方法从队列逐个获取并移除事件、输出事件信息
    3. 输出队列大小信息以校验队列大小正确
        System.out.printf("Main: Queue Size: %d\n", queue.size());
        for (int i = 0; i < taskThreads.length * 1000; i++) {
            Event event = queue.poll();
            System.out.printf("Thread %s: Priority %d\n", event.getThread(), event.getPriority());
        }
    
        System.out.printf("Main: Queue Size: %d\n", queue.size());
        System.out.printf("Main: End of the program\n");
    

讲解

本例用 PriorityBlockingQueue 实现了一个事件对象的优先级队列。 如在介绍提到的,存储在 PriorityBlockingQueue 中的元素都必须实现 Comparable 接口。 所以您必须在 Event 类中实现 compareTo() 方法。

事件有优先级(priority)属性。priority 值大的将排在队列的前面。 实现 compareTo() 方法时,如果本地对象的优先级高过参数的优先级,则返回 -1;否则返回1;如果一样则返回0(此时 PriorityBlockingQueue 不保证元素的顺序)。

本例实现了 Task 类来添加 Event 对象到优先级队列。每个 task 对象用 add() 方法往队列添加 1000 个事件,优先级从 0 到 999。

Main 类的 main() 方法创建了5个 Task 对象并在对应的线程中执行。所有线程结束执行后,将所有元素输出到终端。 为了从队列获取元素,使用了 poll() 方法。此方法从队列返回并移除第一个元素。

了解更多

PriorityBlockingQueue 类还有其他一些有趣的方法,如下:

  • clear() : 从队列移除所有元素
  • take() : 返回并移除队列的第一个元素。如果队列为空,此方法阻塞线程直到队列有元素。
  • put(E e) : E 是 PriorityBlockingQueue 类的泛型参数。此方法将作为一个参数传递的元素插入到队列。
  • peek() : 此方法返回队列的第一个元素,但是不移除它。