实现一个基于优先级的执行者

在内部,一个执行者使用一个阻塞队列来保存等待中的任务。保存的顺序是按到达执行者的顺序。 一个可能的替代是用一个优先级队列来保存新任务。 这样,如果一个有高优先级的任务到达执行者,将被在其他已到达执行者等待执行但优先级较低的线程前执行。

任务

实现一个用优先级队列来保存发送到它那里执行的任务的执行者。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt7.sect03 package中

  • 线程类 : MyPriorityTask : 实现 Runnable 接口和 Comparable<MyPriorityTask> 接口

    • 属性 priority : int 类型 :优先级
    • 属性 name : String 对象:任务名称
    • 在构造函数中初始化属性为参数值
    • priority 属性的 getter 方法
        private int priority;
        private String name;
    
        public MyPriorityTask(String name, int priority) {
            this.name = name;
            this.priority = priority;
        }
    
        public int getPriority() {
            return priority;
        }
    
    • compareTo() 方法: priority 值大的优先级高
        @Override
        public int compareTo(MyPriorityTask o) {
            if (this.getPriority() < o.getPriority()) {
                return 1;
            }
            if (this.getPriority() > o.getPriority()) {
                return -1;
            }
            return 0;
        }
    
    • run() 方法: 输出任务信息后睡上 2 秒
        @Override
        public void run() {
            System.out.printf("MyPriorityTask: %s Priority : %d\n",name,priority);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
  • 控制类 : Main

    1. 创建一个线程池 executor,注意最大线程数为2,使用 PriorityBlockingQueue<Runnable> 对象来保存等待中的队列
    2. 提交4个 MyPriorityTask 到 executor 执行,优先级从0到4
    3. 睡上 1 秒
    4. 提交4个 MyPriorityTask 到 executor 执行,优先级从5到8
    5. shutdown executor
    6. 等待所有任务完成后输出结束信息
            ThreadPoolExecutor executor=new ThreadPoolExecutor(
                    2,2,1, TimeUnit.SECONDS,new PriorityBlockingQueue<Runnable>()
            );
    
            for (int i=0; i<4; i++){
                MyPriorityTask task=new MyPriorityTask ("Task "+i,i);
                executor.execute(task);
            }
    
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            for (int i=4; i<8; i++) {
                MyPriorityTask task=new MyPriorityTask ("Task "+i,i);
                executor.execute(task);
            }
    
            executor.shutdown();
    
            try {
                executor.awaitTermination(1, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.printf("Main: End of the program.\n");
    

讲解

将一个 executor 转为基于优先级很简单。只要传一个 PriorityBlockingQueue 对象(泛型参数为 Runnable 接口)作为参数。 注意存储在优先级队列中的所有对象都必须实现 Comparable 接口。

您实现了 MyPriorityTask 类,该类实现了 Runnable 接口成为了一个任务,实现了Comparable接口以被保存到优先级队列。 任务的优先级属性值越大,将被越早执行。compareTo() 方法决定了任务在优先级队列中的顺序。 在 Main 类中,提交了8个有不同优先级的任务到 executor。 第一个送到的任务最先被执行。因为第一个任务到达时 executor正闲着,就立即执行了。 因为创建的 executor 有2个执行线程,所以最开始2个任务将是最先执行的。接着,其余的任务将按优先级被执行。

了解更多

您可配置 Executor 使用 BlockingQueue 接口的任意实现。一个有趣的实现是 DelayQueue。 此类被用来促成你好延迟激活的元素。它提供了方法只返回活跃的对象。 您可所使用此类来实现您自己版本的 ScheduledThreadPoolExecutor 类。