创建一个 Fork/Join 池

本节中,您将学习如何使用 Fork/Join 框架的基本元素。包括:

  • 创建一个 ForkJoinPool 对象来执行任务
  • 创建一个 ForkJoinTask 的子类来在池中执行

本例中要用到的 Fork/Join 框架的主要特点如下:

  • 用默认构造函数创建 ForkJoinPool
  • 在任务中,使用 Java API 文档建议的结构:

    If (problem size > default size){
        tasks=divide(task);
        execute(tasks);
    } else {
        resolve problem using another algorithm;
    }
    
  • 以同步方式执行任务。当一个任务执行2个或多个子任务,它等待它们的结束。 这种方式工作线程将查找执行其他任务,充分利用运行时间。

  • 您将实现的任务不返回结果,这样将用 RecursiveAction 类作为基类

任务

更新一个产品清单的价格。初始任务负责更新清单里的所有元素。 使用10作为参考规模,这样如果一个任务要更新多于10个元素,它将把分配给它的清单分成2份并创建2个任务更新各自部分的产品价格。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt5.sect02 package中

  • 产品 POJO 类: Product

    • String 类型属性 name、double 类型属性 price 及它们的 getter/setter 方法
  • 辅助类 ProductListGenerator : 用于生成 Product 列表供演示

    • 产品名为 ProductN,价格都为 10
    public class ProductListGenerator {
    
        public List<Product> generate(int size) {
            List<Product> ret = new ArrayList<Product>();
    
            for (int i = 0; i < size; i++) {
                Product product = new Product();
                product.setName("Product " + i);
                product.setPrice(10);
                ret.add(product);
            }
    
            return ret;
        }
    
    }
    
  • RecursiveAction 扩展类 : Task

    • 因为 RecursiveAction 的父类 ForkJoinTask 实现了 Serializable 接口,所以需要声明一个序列版本UID
        private static final long serialVersionUID = 1L;
    
    • 属性 products : List<Product> : 待改价格的产品列表
    • int 类型属性 first, last : 此产品将修改部分在列表里的起始和结束位置
    • double 类型属性 increment: 产品价格的增幅
    • 构造函数初始化上述属性为参数值
        private List<Product> products;
        private int first;
        private int last;
        private double increment;
    
        public Task(List<Product> products, int first, int last, double increment) {
            this.products = products;
            this.first = first;
            this.last = last;
            this.increment = increment;
        }
    
    • 覆盖实现 compute() 方法 :实现任务逻辑
      • 如果 last 和 first 之间的区别小于 10, 则直接调用 updatePrices() 方法更新产品价格
      • 否则创建2个Task(一个处理前半部分,一个处理后半部分)并调用 invokeAll 方法执行它们
        @Override
        protected void compute() {
            if (last - first < 10) {
                updatePrices();
            } else {
                int middle = (last + first) / 2;
                System.out.printf("Task: Pending tasks: %s\n", getQueuedTaskCount());
                Task t1 = new Task(products, first, middle + 1, increment);
                Task t2 = new Task(products, middle + 1, last, increment);
                invokeAll(t1, t2);
            }
        }
    
    • 实现 updatePrices() 方法,更新列表中 last 和 first 之间的产品价格
        private void updatePrices() {
            for (int i = first; i < last; i++) {
                Product product = products.get(i);
                product.setPrice(product.getPrice() * (1 + increment));
            }
        }
    
  • 控制类 : Main

    • 使用 ProductListGenerator 生成一个包含 10000 个产品的列表
        ProductListGenerator generator = new ProductListGenerator();
        List<Product> products = generator.generate(10000);
    
    • 创建一个 Task 对象以更新列表中所有产品的价格
        Task task = new Task(products, 0, products.size(), 0.20);
    
    • 创建一个 ForkJoinPool 对象并用 execute() 方法执行池中的任务
        ForkJoinPool pool = new ForkJoinPool();
        pool.execute(task);
    
    • 每隔 5 毫秒显示池的一些参数信息,直到任务结束
        do {
            System.out.printf("Main: Thread Count: %d\n", pool.getActiveThreadCount());
            System.out.printf("Main: Thread Steal: %d\n", pool.getStealCount());
            System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
            try {
                TimeUnit.MILLISECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!task.isDone());
    
    • 用 shutdown() 方法停止 pool
        pool.shutdown();
    
    • 检查任务是否正常完成,修改结果是否正确
        if (task.isCompletedNormally()) {
            System.out.printf("Main: The process has completed normally.\n");
        }
    
        for (Product product : products) {
            if (product.getPrice() != 12) {
                System.out.printf("Product %s: %f\n", product.getName(), product.getPrice());
            }
        }
    
        System.out.println("Main: End of the program.\n");
    

工作原理

在本例中创建了一个 ForkJoinPool 对象和一个在池中运行的 ForkJoinTask 的子类。 为了创建此 ForkJoinPool 对象,用了无参的构造函数,所以它将用它的默认配置执行。 它创建了一个线程池,池中的线程数和电脑处理器的数目一样多。 当 ForkJoinPool 被创建时,那些线程也被创建并在池中等待直到某些任务来执行。

因为 Task 类不返回结果,它扩展 RecursiveAction 类。在本节中使用了推荐的结构来实现任务。 如果任务必须更新超过10个产品,它将元素集分成2块、创建2个任务、分配每个任务执行一块。 通过 first 和 last 属性可知道此任务在产品列表中要更新的产品的起始位置。 各个任务也只用一个产品列表而不需要为每个任务创建不同的列表。

为了执行任务创建的子任务,调用了 invokeAll() 方法。它是一个同步调用,在继续执行前等待子任务完成。 当任务在等待它的子任务时,执行此任务的工作线程获取另一个在等待中的任务并执行它。 因此行为,Fork/Join 框架在任务管理方面比 Runnable 和 Callable 对象更有效。

ForkJoinTask 类的 invokeAll() 方法是与 Executor 和 Fork/Join 框架之间的主要区别之一。 Executor 框架中,所有任务必须被提交给执行者;而在本例中,任务包含执行、控制任务的方法(如invokeAll等)。

您用了 execute() 方法发送了一个唯一的任务到线程池中更新列表中的所有产品。本例中它是一个异步调用,主线程继续执行。

您用了 ForkJoinPool 类的一些方法来检查池的状态和运行中任务的演变。此类还包含更多有用的方法。

最后,类似 Executor 框架,您应当用 shutdown() 方法结束 ForkJoinPool。

了解更多

ForkJoinPool 提供了其他的方法来执行任务。如下:

  • execute (Runnable task): 接受 Runnable 任务。 注意用 Runnable 对象时,ForkJoinPool 不使用工作窃取算法。工作窃取算法只用于 ForkJoinTask 对象。

  • invoke(ForkJoinTask<T> task): execute() 方法是异步调用,而此方法是同步调用。 此调用直到作为参数传递的任务结束执行了才返回。

  • 您也能使用在 ExecutorService 接口中声明的 invokeAll() 和 invokeAny() 方法。这些方法接受 Callable 对象作为参数。 ForkJoinPool 类在用 Callable 对象时不使用工作窃取算法,所以您最好用一个 executor 来执行它们。

ForkJoinPool 也包含一些其他版本的 invokeAll() 方法,如下:

  • invokeAll(ForkJoinTask<?> tasks)
  • invokeAll(Collection<T> tasks): 泛型 T 必须是 ForkJoinTask 类或子类

尽管 ForkJoinPool 类设计来执行一个 ForkJoinTask 对象,您也能直接执行 Runnable 和 Callable 对象。 您也可以使用 ForkJoinTask 的 adapt() 方法把一个 Callable 或 Runnable 对象转换成一个 ForkJoinTask 对象来执行任务。