Fork/Join 框架是 Executor 和 ExecutorService 接口的一个实现, 这些接口让您可以执行 Callable 和 Runnable 任务而不用管理执行它们的线程。
Fork/Join 框架的目标是执行可以分成较小部分的任务。它的主要成分如下:
Fork/Join 框架的主类是 ForkJoinPool 类。在内部,它有以下两个元素:
在本节中,您将学习如何实现一个定制的工作线程用于一个 ForkJoinPool 类以及如何用一个工厂来使用它。
本节的示例代码在 com.elanzone.books.noteeg.chpt7.sect07 package中
工作线程类:MyWorkerThread :extends ForkJoinWorkerThread
private static ThreadLocal<Integer> taskCounter = new ThreadLocal<Integer>(); protected MyWorkerThread(ForkJoinPool pool) { super(pool); }
@Override protected void onStart() { super.onStart(); System.out.printf("MyWorkerThread %d: Initializing task counter.\n", getId()); taskCounter.set(0); }
@Override protected void onTermination(Throwable exception) { System.out.printf("MyWorkerThread %d: %d\n", getId(), taskCounter.get()); super.onTermination(exception); }
public void addTask() { int counter = taskCounter.get().intValue(); counter++; taskCounter.set(counter); }
工作线程工厂类 : MyWorkerThreadFactory : implements ForkJoinPool.ForkJoinWorkerThreadFactory
@Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new MyWorkerThread(pool); }
Recursive 任务类 : MyRecursiveTask : extends RecursiveTask<Integer>
private int array[]; private int start, end; public MyRecursiveTask(int[] array, int start, int end) { this.array = array; this.end = end; this.start = start; }
@Override protected Integer compute() { MyWorkerThread thread = (MyWorkerThread) Thread.currentThread(); thread.addTask(); if ((end - start) < 100) { int ret = 0; for (int i = start; i < end; i++) { ret += array[i]; } return ret; } else { int mid = (end + start) / 2; MyRecursiveTask task1 = new MyRecursiveTask(array, start, mid); MyRecursiveTask task2 = new MyRecursiveTask(array, mid, end); invokeAll(task1, task2); return addResults(task1, task2); } }
private Integer addResults(MyRecursiveTask task1, MyRecursiveTask task2) { int value; try { value = task1.get() + task2.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); value = 0; } try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return value; }
控制类 : Main
public static void main(String[] args) throws Exception { MyWorkerThreadFactory factory = new MyWorkerThreadFactory(); ForkJoinPool pool = new ForkJoinPool(4, factory, null, false); int array[] = new int[100000]; for (int i = 0; i < array.length; i++) { array[i] = 1; } MyRecursiveTask task = new MyRecursiveTask(array, 0, array.length); pool.execute(task); task.join(); pool.shutdown(); pool.awaitTermination(1, TimeUnit.DAYS); System.out.printf("Main: Result: %d\n", task.get()); System.out.printf("Main: End of the program\n"); }
Fork/Join 框架使用的线程被称为工作线程。 Java 包括 ForkJoinWorkerThread 类,该类扩展 Thread 类并实现了供 Fork/Join 框架使用的工作线程。
在本节中:
MyWorkerThread类
实现了扩展 ForkJoinWorkerThread 类的 MyWorkerThread 类并覆盖了 ForkJoinWorkerThread 类的2个方法。
MyWorkerThreadFactory 类
ForkJoinPool 类如 Java Concurrency API 中的所有执行者,用一个工厂创建它的线程, 所以如果要在一个 ForkJoinPool 类中使用 MyWorkerThread 线程,必须实现自己的线程工厂。 对于 Fork/Join 框架,此工厂必须实现 ForkJoinPool.ForkJoinWorkerThreadFactory 类。 为此,实现了 MyWorkerThreadFactory 类。此类只有一个创建一个新 MyWorkerThread 对象的方法
最后只需要用已创建的工厂初始化一个 ForkJoinPool 类。 本例中在 Main 类中使用 ForkJoinPool 类的构造函数做了这件事。