Executor框架此机制能让您分离线程的创建和执行。 它基于 Executor 和 ExecutorService 接口,以及实现这2个接口的 ThreadPoolExecutor 类。 它有个内部的线程池并提供方法让您提交各种任务使其在线程池中执行。这些任务是:
两种情况,您只能提交任务到执行者。执行者使用线程池中的一个线程或者创建一个新的线程来执行那些任务。 执行者也决定任务什么时刻执行。
本节的示例代码在 com.elanzone.books.noteeg.chpt7.sect02 package中
ThreadPoolExecutor 扩展类 :MyExecutor
private ConcurrentHashMap<String, Date> startTimes; public MyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); startTimes = new ConcurrentHashMap<>(); }
@Override public void shutdown() { System.out.printf("MyExecutor: Going to shutdown.\n"); System.out.printf("MyExecutor: Executed tasks: %d\n", getCompletedTaskCount()); System.out.printf("MyExecutor: Running tasks: %d\n", getActiveCount()); System.out.printf("MyExecutor: Pending tasks: %d\n", getQueue().size()); super.shutdown(); }
@Override public List<Runnable> shutdownNow() { System.out.printf("MyExecutor: Going to immediately shutdown.\n"); System.out.printf("MyExecutor: Executed tasks: %d\n", getCompletedTaskCount()); System.out.printf("MyExecutor: Running tasks: %d\n", getActiveCount()); System.out.printf("MyExecutor: Pending tasks: %d\n", getQueue().size()); return super.shutdownNow(); }
@Override protected void beforeExecute(Thread t, Runnable r) { System.out.printf("MyExecutor: A task is beginning: %s : %s\n", t.getName(), r.hashCode()); startTimes.put(String.valueOf(r.hashCode()), new Date()); }
@Override protected void afterExecute(Runnable r, Throwable t) { Future<?> result = (Future<?>) r; try { System.out.printf("*********************************\n"); System.out.printf("MyExecutor: A task is finishing.\n"); System.out.printf("MyExecutor: Result: %s\n", result.get()); Date startDate = startTimes.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long diff = finishDate.getTime() - startDate.getTime(); System.out.printf("MyExecutor: Duration: %d\n", diff); System.out.printf("*********************************\n"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
Callable<String> 类 : SleepTwoSecondsTask
@Override public String call() throws Exception { TimeUnit.SECONDS.sleep(2); return new Date().toString(); }
控制类 : Main
MyExecutor myExecutor = new MyExecutor( 2, 4, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>() ); List<Future<String>> results = new ArrayList<>(); for (int i = 0; i < 10; i++) { SleepTwoSecondsTask task = new SleepTwoSecondsTask(); Future<String> result = myExecutor.submit(task); results.add(result); } for (int i = 0; i < 5; i++) { try { String result = results.get(i).get(); System.out.printf("Main: Result for Task %d : %s\n", i, result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } myExecutor.shutdown(); for (int i = 5; i < 10; i++) { try { String result = results.get(i).get(); System.out.printf("Main: Result for Task %d : %s\n", i, result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } try { myExecutor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Main: End of the program.\n");
本节中扩展了 ThreadPoolExecutor 类实现了定制执行者并覆盖了4个方法。
实现了 Callable 接口的 SleepTwoSecondsTask 类让执行线程睡上2秒; 在 Main 类中,把 10 个任务送到了执行者并用执行者和其他类演示了它们的特性。