本节中,您将学习如何使用 Fork/Join 框架的基本元素。包括:
本例中要用到的 Fork/Join 框架的主要特点如下:
在任务中,使用 Java API 文档建议的结构:
If (problem size > default size){ tasks=divide(task); execute(tasks); } else { resolve problem using another algorithm; }
以同步方式执行任务。当一个任务执行2个或多个子任务,它等待它们的结束。 这种方式工作线程将查找执行其他任务,充分利用运行时间。
本节的示例代码在 com.elanzone.books.noteeg.chpt5.sect02 package中
产品 POJO 类: Product
辅助类 ProductListGenerator : 用于生成 Product 列表供演示
public class ProductListGenerator { public List<Product> generate(int size) { List<Product> ret = new ArrayList<Product>(); for (int i = 0; i < size; i++) { Product product = new Product(); product.setName("Product " + i); product.setPrice(10); ret.add(product); } return ret; } }
RecursiveAction 扩展类 : Task
private static final long serialVersionUID = 1L;
private List<Product> products; private int first; private int last; private double increment; public Task(List<Product> products, int first, int last, double increment) { this.products = products; this.first = first; this.last = last; this.increment = increment; }
@Override protected void compute() { if (last - first < 10) { updatePrices(); } else { int middle = (last + first) / 2; System.out.printf("Task: Pending tasks: %s\n", getQueuedTaskCount()); Task t1 = new Task(products, first, middle + 1, increment); Task t2 = new Task(products, middle + 1, last, increment); invokeAll(t1, t2); } }
private void updatePrices() { for (int i = first; i < last; i++) { Product product = products.get(i); product.setPrice(product.getPrice() * (1 + increment)); } }
控制类 : Main
ProductListGenerator generator = new ProductListGenerator(); List<Product> products = generator.generate(10000);
Task task = new Task(products, 0, products.size(), 0.20);
ForkJoinPool pool = new ForkJoinPool(); pool.execute(task);
do { System.out.printf("Main: Thread Count: %d\n", pool.getActiveThreadCount()); System.out.printf("Main: Thread Steal: %d\n", pool.getStealCount()); System.out.printf("Main: Parallelism: %d\n", pool.getParallelism()); try { TimeUnit.MILLISECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone());
pool.shutdown();
if (task.isCompletedNormally()) { System.out.printf("Main: The process has completed normally.\n"); } for (Product product : products) { if (product.getPrice() != 12) { System.out.printf("Product %s: %f\n", product.getName(), product.getPrice()); } } System.out.println("Main: End of the program.\n");
在本例中创建了一个 ForkJoinPool 对象和一个在池中运行的 ForkJoinTask 的子类。 为了创建此 ForkJoinPool 对象,用了无参的构造函数,所以它将用它的默认配置执行。 它创建了一个线程池,池中的线程数和电脑处理器的数目一样多。 当 ForkJoinPool 被创建时,那些线程也被创建并在池中等待直到某些任务来执行。
因为 Task 类不返回结果,它扩展 RecursiveAction 类。在本节中使用了推荐的结构来实现任务。 如果任务必须更新超过10个产品,它将元素集分成2块、创建2个任务、分配每个任务执行一块。 通过 first 和 last 属性可知道此任务在产品列表中要更新的产品的起始位置。 各个任务也只用一个产品列表而不需要为每个任务创建不同的列表。
为了执行任务创建的子任务,调用了 invokeAll() 方法。它是一个同步调用,在继续执行前等待子任务完成。 当任务在等待它的子任务时,执行此任务的工作线程获取另一个在等待中的任务并执行它。 因此行为,Fork/Join 框架在任务管理方面比 Runnable 和 Callable 对象更有效。
ForkJoinTask 类的 invokeAll() 方法是与 Executor 和 Fork/Join 框架之间的主要区别之一。 Executor 框架中,所有任务必须被提交给执行者;而在本例中,任务包含执行、控制任务的方法(如invokeAll等)。
您用了 execute() 方法发送了一个唯一的任务到线程池中更新列表中的所有产品。本例中它是一个异步调用,主线程继续执行。
您用了 ForkJoinPool 类的一些方法来检查池的状态和运行中任务的演变。此类还包含更多有用的方法。
最后,类似 Executor 框架,您应当用 shutdown() 方法结束 ForkJoinPool。
ForkJoinPool 提供了其他的方法来执行任务。如下:
execute (Runnable task): 接受 Runnable 任务。 注意用 Runnable 对象时,ForkJoinPool 不使用工作窃取算法。工作窃取算法只用于 ForkJoinTask 对象。
invoke(ForkJoinTask<T> task): execute() 方法是异步调用,而此方法是同步调用。 此调用直到作为参数传递的任务结束执行了才返回。
ForkJoinPool 也包含一些其他版本的 invokeAll() 方法,如下:
尽管 ForkJoinPool 类设计来执行一个 ForkJoinTask 对象,您也能直接执行 Runnable 和 Callable 对象。 您也可以使用 ForkJoinTask 的 adapt() 方法把一个 Callable 或 Runnable 对象转换成一个 ForkJoinTask 对象来执行任务。