在执行者中分离任务启动与结果处理

通常当您使用一个执行者来执行并发任务,您会把 Runnable 或 Callable 任务送到 executor 并获得 Future 对象来控制。 有时,您会需要将任务在一个对象中送给执行者并在另一个对象中处理结果。 为此,Java Concurrency API 提供了 CompletionService 类。

CompletionService 类有一个方法将任务送给一个执行者,另一个方法为下一个已完成的任务获得Future对象。 本质上它使用一个 Executor 对象来执行任务。 此行为有共享一个 CompletionService 对象的好处,并且将任务送到执行者这样其它对象能处理结果。 限制是第二个对象只能获得那些已经执行完了的任务的 Future 对象,所以这些 Future 对象只能被用来获得任务的结果。

任务

学习使用 CompletionService 在一个执行者中分离任务启动与结果处理。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt4.sect11 package中

  • Callable<String> 类 : ReportGenerator : 负责生成报表

    • 属性 sender : String 对象
    • 属性 title : String 对象
    • 在构造函数中初始化属性为参数值
        private String sender;
        private String title;
    
        public ReportGenerator(String sender, String title) {
            this.sender = sender;
            this.title = title;
        }
    
    • call() 方法 : 睡上几秒后生成报表的内容(只是简单地把 sender 和 title 拼接起来)
        @Override
        public String call() throws Exception {
            try {
                Long duration = (long) (Math.random() * 10);
                System.out.printf(
                        "%s_%s: ReportGenerator: Generating a report during %d seconds\n",
                        this.sender, this.title, duration
                );
                TimeUnit.SECONDS.sleep(duration);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return sender + ": " + title;
        }
    
  • 线程类 ReportRequest : 请求生成报表(向 CompletionService 发送 ReportGenerator 任务)

    • 属性 name : String 类型
    • 属性 service : CompletionService<String>
    • 在构造函数中初始化属性为参数值
        private String name;
    
        private CompletionService<String> service;
    
        public ReportRequest(String name, CompletionService<String> service) {
            this.name = name;
            this.service = service;
        }
    
    • run() 方法
      • 创建3个 ReportGenerator 对象并用 submit 方法将其发送给 CompletionService
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                ReportGenerator reportGenerator = new ReportGenerator(name, "Report");
                service.submit(reportGenerator);
            }
        }
    
  • 线程类 ReportProcessor : 对报表进行处理(从 CompletionService 获取 Future 结果并处理)

    • 属性 service : CompletionService<String> 对象
    • 属性 end : boolean 类型
    • 在构造函数中初始化service属性为参数值,end为false
        private CompletionService<String> service;
        private boolean end;
    
        public ReportProcessor(CompletionService<String> service) {
            this.service = service;
            end = false;
        }
    
    • run() 方法
      1. 当 end 属性为 false 时,调用 CompletionService 接口的 poll() 方法获得下一个完成的任务的结果(Future 对象)
      2. 使用 Future 对象的 get() 方法获得任务的结果并输出到终端
        @Override
        public void run() {
            while (!end) {
                try {
                    Future<String> result = service.poll(20, TimeUnit.SECONDS);
                    if (result != null) {
                        String report = result.get();
                        System.out.printf("ReportReceiver: Report Received: %s\n", report);
                    }
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            System.out.printf("ReportSender: End\n");
        }
    
    • 提供 end 属性的 setter 方法使得可修改 end 属性值
        public void setEnd(boolean end) {
            this.end = end;
        }
    
  • 控制类 : Main

    • 使用 Executors 类的 newCachedThreadPool() 方法创建一个 ExecutorService
        ExecutorService executor = Executors.newCachedThreadPool();
    
    • 创建一个 CompletionService,它将使用上一步创建的 executor 生成线程处理发过来的任务
        CompletionService<String> service = new ExecutorCompletionService<>(executor);
    
    • 创建两个 ReportRequest 对象及对应执行它们的线程 (ReportRequest将把ReportGenerator任务发给service处理)
        ReportRequest faceRequest = new ReportRequest("Face", service);
        ReportRequest onlineRequest = new ReportRequest("Online", service);
    
        Thread faceThread = new Thread(faceRequest);
        Thread onlineThread = new Thread(onlineRequest);
    
    • 创建一个 ReportProcess 对象及对应的线程 (ReportProcess将从service获取报表内容进行处理)
        ReportProcessor processor = new ReportProcessor(service);
        Thread senderThread = new Thread(processor);
    
    • 启动上面 3 个线程并等待两个 ReportRequest 对象结束
        System.out.printf("Main: Starting the Threads\n");
        faceThread.start();
        onlineThread.start();
        senderThread.start();
    
        try {
            System.out.printf("Main: Waiting for the report generators.\n");
            faceThread.join();
            onlineThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
    • 使用 shutdown() 方法停止执行者 并 使用 awaitTermination() 方法等待任务的结束
        System.out.printf("Main: Shutting down the executor.\n");
        executor.shutdown();
        try {
            executor.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    
    • 设置 ReportSender 对象的 end 属性的值使其结束
        processor.setEnd(true);
        System.out.println("Main: Ends");
    

工作原理

在 Main 类中,使用 Executors 类的 newCachedThreadPool() 方法创建 ThreadPoolExecutor。 然后使用该对象初始化一个 CompletionService 对象,因为 CompletionService 使用一个执行者来执行它的任务。 如 ReportRequest 类中一样,使用 submit() 方法让 CompletionService 执行任务。

当 CompletionService 结束了其中一个任务的执行,它将用来控制任务执行的 Future 对象存入一个队列。 poll() 方法访问此队列看是否有任务已经完成了它的执行,如果有,则将队列的第一个元素(控制该已完成任务的Future对象)返回。 并将其从队列中删除。 在本例中,poll()方法的2个参数表示在已完成任务的结果队列为空时,希望等多长时间直到任务完成。

CompletionService 对象被创建后,创建了 2 个 ReportRequest 对象,每个提供 3 个 ReportGenerator 任务给 CompletionService 执行。 一个 ReportSender 任务将处理 2 个 ReportRequest 对象发送的任务产生的结果。

了解更多

CompletionService 类能执行 Callable 或 Runnable 任务。本例使用的是 Callable,但也可以使用 Runnable 对象。 因为 Runnable 对象不产生结果,CompletionService 类哲学上并不宜应用到此场景。

CompletionService 类提供了另外 2 个方法来获得已完成任务的 Future 对象。这些方法如下:

  • poll() : 没有参数的 poll() 方法检查队列中是否有 Future 对象。
    • 如果队列为空,则立即返回 null
    • 否则返回第一个元素并将其从队列移除
  • take() : 此方法没有参数。检查队列中是否有 Future 对象。
    • 如果队列为空,则阻塞线程直到队列有元素
    • 当队列有元素时,返回第一个元素并将其从队列中删除