通常当您使用一个执行者来执行并发任务,您会把 Runnable 或 Callable 任务送到 executor 并获得 Future 对象来控制。 有时,您会需要将任务在一个对象中送给执行者并在另一个对象中处理结果。 为此,Java Concurrency API 提供了 CompletionService 类。
CompletionService 类有一个方法将任务送给一个执行者,另一个方法为下一个已完成的任务获得Future对象。 本质上它使用一个 Executor 对象来执行任务。 此行为有共享一个 CompletionService 对象的好处,并且将任务送到执行者这样其它对象能处理结果。 限制是第二个对象只能获得那些已经执行完了的任务的 Future 对象,所以这些 Future 对象只能被用来获得任务的结果。
本节的示例代码在 com.elanzone.books.noteeg.chpt4.sect11 package中
Callable<String> 类 : ReportGenerator : 负责生成报表
private String sender; private String title; public ReportGenerator(String sender, String title) { this.sender = sender; this.title = title; }
@Override public String call() throws Exception { try { Long duration = (long) (Math.random() * 10); System.out.printf( "%s_%s: ReportGenerator: Generating a report during %d seconds\n", this.sender, this.title, duration ); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } return sender + ": " + title; }
线程类 ReportRequest : 请求生成报表(向 CompletionService 发送 ReportGenerator 任务)
private String name; private CompletionService<String> service; public ReportRequest(String name, CompletionService<String> service) { this.name = name; this.service = service; }
@Override public void run() { for (int i = 0; i < 3; i++) { ReportGenerator reportGenerator = new ReportGenerator(name, "Report"); service.submit(reportGenerator); } }
线程类 ReportProcessor : 对报表进行处理(从 CompletionService 获取 Future 结果并处理)
private CompletionService<String> service; private boolean end; public ReportProcessor(CompletionService<String> service) { this.service = service; end = false; }
@Override public void run() { while (!end) { try { Future<String> result = service.poll(20, TimeUnit.SECONDS); if (result != null) { String report = result.get(); System.out.printf("ReportReceiver: Report Received: %s\n", report); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } System.out.printf("ReportSender: End\n"); }
public void setEnd(boolean end) { this.end = end; }
控制类 : Main
ExecutorService executor = Executors.newCachedThreadPool();
CompletionService<String> service = new ExecutorCompletionService<>(executor);
ReportRequest faceRequest = new ReportRequest("Face", service); ReportRequest onlineRequest = new ReportRequest("Online", service); Thread faceThread = new Thread(faceRequest); Thread onlineThread = new Thread(onlineRequest);
ReportProcessor processor = new ReportProcessor(service); Thread senderThread = new Thread(processor);
System.out.printf("Main: Starting the Threads\n"); faceThread.start(); onlineThread.start(); senderThread.start(); try { System.out.printf("Main: Waiting for the report generators.\n"); faceThread.join(); onlineThread.join(); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.printf("Main: Shutting down the executor.\n"); executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
processor.setEnd(true); System.out.println("Main: Ends");
在 Main 类中,使用 Executors 类的 newCachedThreadPool() 方法创建 ThreadPoolExecutor。 然后使用该对象初始化一个 CompletionService 对象,因为 CompletionService 使用一个执行者来执行它的任务。 如 ReportRequest 类中一样,使用 submit() 方法让 CompletionService 执行任务。
当 CompletionService 结束了其中一个任务的执行,它将用来控制任务执行的 Future 对象存入一个队列。 poll() 方法访问此队列看是否有任务已经完成了它的执行,如果有,则将队列的第一个元素(控制该已完成任务的Future对象)返回。 并将其从队列中删除。 在本例中,poll()方法的2个参数表示在已完成任务的结果队列为空时,希望等多长时间直到任务完成。
CompletionService 对象被创建后,创建了 2 个 ReportRequest 对象,每个提供 3 个 ReportGenerator 任务给 CompletionService 执行。 一个 ReportSender 任务将处理 2 个 ReportRequest 对象发送的任务产生的结果。
CompletionService 类能执行 Callable 或 Runnable 任务。本例使用的是 Callable,但也可以使用 Runnable 对象。 因为 Runnable 对象不产生结果,CompletionService 类哲学上并不宜应用到此场景。
CompletionService 类提供了另外 2 个方法来获得已完成任务的 Future 对象。这些方法如下: