取消一个任务

在一个 ForkJoinPool 类中执行 ForkJoinTask 任务时,可在任务开始执行前取消任务。 ForkJoinTask 类提供了 cancel() 方法来达成此目的。在取消一个任务时要考虑以下几点:

  • ForkJoinPool 类不提供方法取消在运行中或在池中等待中的任务
  • 取消一个任务时,不取消此任务已经执行了的任务

任务

在一个数组中查找一个数字的位置。第一个找到此数字的任务将取消其余的任务。 因为 Fork/Join 框架不提供此功能,您将实现一个辅助类来取消。

实现

  • 辅助类 : ArrayGenerator : 生成指定大小的随机10以内整数数组

    public class ArrayGenerator {
    
        public int[] generateArray(int size) {
            int array[] = new int[size];
            Random random = new Random();
            for (int i = 0; i < size; i++) {
                array[i] = random.nextInt(10);
            }
            return array;
        }
    
    }
    
  • 辅助类 :TaskManager : 保存在 ForkJoinPool 中执行的所有任务。 因为 ForkJoinPool 和 ForkJoinTask 的限制,将用此类来取消池子里的任务

    • 属性 tasks : List<ForkJoinTask<Integer>> 对象
    • 在构造函数初始化 tasks 为一个空的列表
        private List<ForkJoinTask<Integer>> tasks;
    
        public TaskManager() {
            tasks = new ArrayList<>();
        }
    
    • addTask() 方法:把 ForkJoinTask 任务加到任务列表
        public void addTask(ForkJoinTask<Integer> task) {
            tasks.add(task);
        }
    
    • cancelTasks() 方法:使用 cancel() 方法取消保存在任务列表里的除参数指定的以外的所有 ForkJoinTask 对象。
        public void cancelTasks(ForkJoinTask<Integer> cancelTask) {
            for (ForkJoinTask<Integer> task : tasks) {
                if (task != cancelTask) {
                    task.cancel(true);
                    ((SearchNumberTask) task).writeCancelMessage();
                }
            }
        }
    
  • 任务类:SearchNumberTask : extends RecursiveTask<Integer>

    • 序列化版本UID : serialVersionUID
    • 属性:int数组 numbers及起始值 start 、结束值 end,要查找的数字 number
    • 属性:TaskManager manager : 将用此对象取消所有任务
    • 常量 NOT_FOUND (-1) : 任务没找到数字时的返回值
    • 构造函数将各属性初始化为参数值
        private static final long serialVersionUID = 1L;
    
        private int numbers[];
        private int start, end;
        private int number;
    
        private TaskManager manager;
    
        private final static int NOT_FOUND = -1;
    
        public SearchNumberTask(int[] numbers, int start, int end, int number, TaskManager manager) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
            this.number = number;
            this.manager = manager;
        }
    
    • compute() 方法
      1. 输出任务的 start 、end 属性值
      2. 如果 start 和 end 之间相差超过10,则调用 launchTasks() 方法将这些任务分成2个子任务
      3. 否则调用 lookForNumber 方法在数组的这块内容中查找指定数字
      4. 返回结果
        @Override
        protected Integer compute() {
            System.out.println("Task: " + start + ":" + end);
    
            int ret;
            if (end - start > 10) {
                ret = launchTasks();
            } else {
                ret = lookForNumber();
            }
    
            return ret;
        }
    
    • lookForNumber() 方法:
      1. 将此块中的数字逐个与指定数字比较
        如果相等,则:
        1. 输出信息到终端
        2. 用 TaskManager 对象的 cancelTasks() 方法取消所有任务
        3. 返回找到数字的元素位置
      2. 每比较一个数字睡上1秒
      3. 最后返回 NOT_FOUND
        private int lookForNumber() {
            for (int i = start; i < end; i++) {
                if (numbers[i] == number) {
                    System.out.printf("Task: Number %d found in position %d\n", number, i);
                    manager.cancelTasks(this);
                    return i;
                }
    
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            return NOT_FOUND;
        }
    
    • launchTasks() 方法
      1. 将处理区间均分为2个并创建2个 SearchNumberTask 来处理它们
      2. 将创建的2个任务加到 TaskManager 对象
      3. 调用 fork() 方法异步执行这两个任务
      4. 等待任务结束并返回任务结果(优先返回找到的位置而不是NOT_FOUND)
        private int launchTasks() {
            int mid = (start + end) / 2;
            SearchNumberTask task1 = new SearchNumberTask(numbers, start, mid, number, manager);
            SearchNumberTask task2 = new SearchNumberTask(numbers, mid, end, number, manager);
    
            manager.addTask(task1);
            manager.addTask(task2);
    
            task1.fork();
            task2.fork();
    
            int returnValue;
            returnValue = task1.join();
            if (returnValue != NOT_FOUND) {
                return returnValue;
            }
    
            returnValue = task2.join();
            return returnValue;
        }
    
    • writeCancelMessage() 方法 :当任务被取消时,输出一些信息到终端
        public void writeCancelMessage() {
            System.out.printf("Task: Canceled task from %d to %d\n", start, end);
        }
    
  • 控制类 :Main

    1. 用 ArrayGenerator 创建一个有 1000 个数字的数组
    2. 创建一个 TaskManager 对象
    3. 用默认构造函数创建一个 ForkJoinPool 对象
    4. 创建一个 SearchNumberTask 对象来处理之前生成的数组, 查找数字5
    5. 调用 execute() 方法在池子中异步执行任务
    6. 调用 shutdown() 方法停止池子
    7. 调用池子的 awaitTermination() 方法等待任务的完成
    8. 输出结束信息
            ArrayGenerator generator = new ArrayGenerator();
            int array[] = generator.generateArray(1000);
    
            TaskManager manager = new TaskManager();
            ForkJoinPool pool = new ForkJoinPool();
    
            SearchNumberTask task = new SearchNumberTask(array, 0, 1000, 5, manager);
            pool.execute(task);
            pool.shutdown();
    
            try {
                pool.awaitTermination(1, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.printf("Main: The program has finished\n");
    

讲解

ForkJoinTask 类提供 cancel() 方法使得可以取消一个 尚未执行 的任务。如果任务已经开始执行,则调用cancel()方法没有效果。 此方法接受一个名为 mayInterruptIfRunning 的 Boolean 类型参数。 此变量名可能会让您认为,如果传递 true 到此方法,即使任务在运行也将被取消。 Java API 文档明确提出,在 ForkJoinTask 类的默认实现中,此属性没有效果。任务只在尚未开始执行时才能被取消。 任务被送到池子(不是ForkJoinPool的内部任务队列,而是分配了线程进入线程池)后,对任务的取消就没有效果了,它们将继续执行。

Fork/Join 框架的一个限制是它没有提供取消在 ForkJoinPool 中的所有任务的方法。 为了跳过此限制,实现了 TaskManager 类。它保存所有在 ForkJoinPool 中的所有任务。它提供了一个方法取消所有它保存的任务。 如果一个任务因为它在运行或已经结束而不能被取消时,cancel()方法返回 false,这样您可以尝试取消所有任务而不用担心有什么可能的副作用。

本例中,实现了一个在数字数组里查找一个数字的任务。您按Fork/Join框架推荐的将任务分解成较小的子任务。 您只对数字的出现(而不是出现次数)感兴趣,所有当找到它时,取消其他的任务。