在计划线程池中运行定制任务

计划线程池是 Executor 框架的基础线程池的一个扩展。它让您可以计划任务在一段时间后被执行。 它由 ScheduledThreadPoolExecutor 类实现,允许执行以下两类任务:

  • 延时任务:在一段时间后只执行一次
  • 周期任务: 延时后执行然后周期执行

延时任务可以执行 Callable 和 Runnable 对象,但是周期任务只能执行 Runnable 对象。 被一个计划池执行的所有任务都是 RunnableScheduledFuture 接口的实现。

任务

实现 RunnableScheduledFuture 接口以执行延时和周期任务。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt7.sect06 package中

  • MyScheduledTask<V> : extends FutureTask<V> implements RunnableScheduledFuture<V>

    • 属性 task : RunnableScheduledFuture<V> 对象
    • 属性 executor : ScheduledThreadPoolExecutor 对象
    • 属性 period : long 类型 :每个周期的时长
    • 属性 startDate : long 类型 :开始时间
    • 构造函数
            private RunnableScheduledFuture<V> task;
            private ScheduledThreadPoolExecutor executor;
            private long period;
            private long startDate;
    
            /**
             * 构造函数
             *
             * @param runnable    将被一个任务执行的 Runnable 对象
             * @param result      任务的返回值
             * @param executor    将执行任务的 ScheduledThreadPoolExecutor 对象
             * @param task        被用于创建 MyScheduledTask 对象的 RunnableScheduledFuture 任务
             */
            public MyScheduledTask(Runnable runnable, V result,
                                   ScheduledThreadPoolExecutor executor, RunnableScheduledFuture<V> task) {
                super(runnable, result);
                this.executor = executor;
                this.task = task;
            }
    
    • isPeriodic() 方法: 判断是否是周期任务,如果是周期任务,则可能要规划下次执行的时间
      • 调用原始任务 task 的 isPeriodic() 方法
            @Override
            public boolean isPeriodic() {
                return task.isPeriodic();
            }
    
    • getDelay() 方法 :离下次执行还需要多少时间
      • 如果任务是一个周期任务且开始时间属性值不为0,则返回开始时间和实际时间的差值
        • 注意结果要转换成参数指定的时间单位
      • 否则返回task属性中的原始任务的延时
            @Override
            public long getDelay(TimeUnit unit) {
                if (!isPeriodic()) {
                    return task.getDelay(unit);
                } else {
                    if (startDate == 0) {
                        return task.getDelay(unit);
                    } else {
                        Date now = new Date();
                        long delay = startDate - now.getTime();
                        return unit.convert(delay, TimeUnit.MILLISECONDS);
                    }
                }
            }
    
    • compareTo() 方法 : 调用原始任务 task 的 compareTo() 方法
            @Override
            public int compareTo(Delayed o) {
                return task.compareTo(o);
            }
    
            @Override
            public boolean isPeriodic() {
                return task.isPeriodic();
            }
    
    • run() 方法
      1. 如果是周期任务
        1. 更新 startDate 属性为任务下次执行的开始时间(当前时间和周期时长的和)
        2. 将任务加回 ScheduledThreadPoolExecutor 对象的队列
      2. 输出当前时间和任务的一些信息
      3. 调用父类的 runAndReset() 方法后输出当前时间信息
            @Override
            public void run() {
                if (isPeriodic() && (!executor.isShutdown())) {
                    Date now = new Date();
                    startDate = now.getTime() + period;
                    executor.getQueue().add(this);
                }
    
                System.out.printf("Pre-MyScheduledTask: %s\n", new Date());
                System.out.printf("MyScheduledTask: Is Periodic: %s\n", isPeriodic());
                super.runAndReset();
                System.out.printf("Post-MyScheduledTask: %s\n", new Date());
            }
    
    • period 属性的 setter 方法
            public void setPeriod(long period) {
                this.period = period;
            }
    
  • MyScheduledThreadPoolExecutor : extends ScheduledThreadPoolExecutor : 用以执行 MyScheduledTask 任务

    • 构造函数
            public MyScheduledThreadPoolExecutor(int corePoolSize) {
                super(corePoolSize);
            }
    
    • decorateTask() 方法:
            /**
             * 创建并返回一个 MyScheduledTask<V>对象
             *
             * @param runnable  将被执行的 Runnable 对象
             * @param task      将执行 Runnable 对象的任务
             * @param <V>       泛型参数
             * @return          返回 MyScheduledTask<V> 对象
             */
            @Override
            protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
                                                                  RunnableScheduledFuture<V> task) {
                return new MyScheduledTask<>(runnable, null, this, task);
            }
    
    • scheduledAtFixedRate() 方法:调用父类方法后将返回对象转换为 MyScheduledTask 对象并设置任务的周期时长
            @Override
            public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                          long initialDelay, long period, TimeUnit unit) {
                ScheduledFuture<?> task = super.scheduleAtFixedRate(command, initialDelay, period, unit);
                MyScheduledTask<?> myTask = (MyScheduledTask<?>) task;
                myTask.setPeriod(TimeUnit.MILLISECONDS.convert(period, unit));
                return task;
            }
    
  • 普通任务类 : Task : implements Runnable

    • run() 方法:睡上2秒,睡前睡后输出些信息
            @Override
            public void run() {
                System.out.printf("Task: Begin.\n");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.printf("Task: End.\n");
            }
    
  • 控制类 :Main

    1. 创建一个 MyScheduledThreadPoolExecutor 对象,线程池大小为 2
    2. 创建一个 Task 对象,调用执行者的 schedule() 方法提交并计划在 1 秒后运行
    3. 睡上 3 秒
    4. 创建一个 Task 对象,提交到执行者周期执行,1秒后开始执行,每次间隔时间 3 秒
    5. 睡上 10 秒
    6. shutdown 执行者后 awaitTermination 等待正在执行的任务结束
            public static void main(String[] args) throws Exception {
                MyScheduledThreadPoolExecutor executor = new MyScheduledThreadPoolExecutor(2);
    
                Task task = new Task();
                System.out.printf("Main: %s\n", new Date());
                executor.schedule(task, 1, TimeUnit.SECONDS);
    
                TimeUnit.SECONDS.sleep(3);
    
                task = new Task();
                System.out.printf("Main: %s\n", new Date());
                executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);
    
                TimeUnit.SECONDS.sleep(10);
    
                executor.shutdown();
                executor.awaitTermination(1, TimeUnit.DAYS);
    
                System.out.printf("Main: End of the program.\n");
            }
    

讲解

本节中实现了 MyScheduledTask 类来实现一个定制的能在一个 ScheduledThreadPoolExecutor 执行者中执行的任务。 MyScheduledTask 类扩展 FutureTask 类并实现了 RunnableScheduledFuture 接口。 它实现 RunnableScheduledFuture 接口因为所有在一个计划执行者中被执行的任务都必须实现此接口并扩展 FutureTask 类 (此类提供 RunnableScheduledFuture 接口内声明的方法的健全的实现)。上述提到的接口和类都是泛型类,泛型参数是任务将返回数据的类型。

为了在一个计划执行者中使用一个 MyScheduledTask 任务,您覆盖了 MyScheduledThreadPoolExecutor 类中的 decorateTask() 方法。 此类扩展 ScheduledThreadPoolExecutor 执行者,decorateTask() 方法提供机制来转换ScheduledThreadPoolExecutor中的默认的计划任务为 MyScheduledTask 任务。 这样,当您实现您自己版本的计划任务时,您必须实现您自己版本的计划执行者。

  • decorateTask() 方法用参数简单地创建一个新的 MyScheduledTask 对象。
    • 一个 Runnable 对象:将在任务中被执行
    • 一个结果对象:将被返回(在本例中,任务不会返回结果,所以用了 null 值)
    • 将执行任务的执行者。在本例中,用 this 关键字来引用正创建任务的执行者。
    • 用来执行 Runnable 对象的原始任务。在池子中新的对象将替换此任务

getDelay() 方法被计划执行者调用以获知是否要执行一个任务。此方法的行为在延时和周期任务中是不同的。 MyScheduledClass类的构造函数接受原始的将执行 Runnable 对象的 ScheduledRunnableFuture 对象作为参数,并将其保存为类的一个属性以访问它的方法和数据。 当要执行一个延时任务,getDelay() 方法访问原始任务的延时时间,但是周期任务的情况下,getDelay() 方法返回 startDate 属性和实际时间之间的时差。

run() 方法执行任务。周期任务的一个特性是如果任务要再次执行,必须将任务的下一次执行作为一个新的任务放到执行者的队列中。 所以如果是执行一个周期任务,设置startDate属性值为当前实际时间加上任务执行的周期(startDate属性保存任务下次执行的开始时间),并将任务再次保存到执行者的队列中。 接着,用FutureTask类提供的 runAndReset() 方法执行任务。 如果是延时任务,则不需要将它们放到执行者的队列中,因为它们只执行一次。

  • 您也要考虑执行者被停止的情况。此时,不必再次将周期任务保存到执行者的队列中。

最后,覆盖了 MyScheduledThreadPoolExecutor 类中的 scheduleAtFixedRate() 方法。 前面提到对于周期任务,用任务的周期设置 startDate 属性的值,但还没有初始化任务的周期。 必须覆盖此方法,该方法接受周期时间作为一个参数,传递给 MyScheduledTask 类这样它就能用了。

例子结尾实现了 Runnable 接口的 Task 类。此类在计划执行者中被执行。 例子的 Main 类创建一个 MyScheduledThreadPoolExecutor 执行者并将以下2个任务发送到执行者:

  • 延时任务:1秒后执行
  • 周期任务:第1次在1秒后执行,然后每3秒执行1次

了解更多

ScheduledThreadPoolExecutor 类提供了另一个版本的 decorateTask() 方法,接受一个 Callable 对象作为参数,而不是一个 Runnable 对象。