在一个 ForkJoinPool 类中执行 ForkJoinTask 任务时,可在任务开始执行前取消任务。 ForkJoinTask 类提供了 cancel() 方法来达成此目的。在取消一个任务时要考虑以下几点:
辅助类 : ArrayGenerator : 生成指定大小的随机10以内整数数组
public class ArrayGenerator { public int[] generateArray(int size) { int array[] = new int[size]; Random random = new Random(); for (int i = 0; i < size; i++) { array[i] = random.nextInt(10); } return array; } }
辅助类 :TaskManager : 保存在 ForkJoinPool 中执行的所有任务。 因为 ForkJoinPool 和 ForkJoinTask 的限制,将用此类来取消池子里的任务
private List<ForkJoinTask<Integer>> tasks; public TaskManager() { tasks = new ArrayList<>(); }
public void addTask(ForkJoinTask<Integer> task) { tasks.add(task); }
public void cancelTasks(ForkJoinTask<Integer> cancelTask) { for (ForkJoinTask<Integer> task : tasks) { if (task != cancelTask) { task.cancel(true); ((SearchNumberTask) task).writeCancelMessage(); } } }
任务类:SearchNumberTask : extends RecursiveTask<Integer>
private static final long serialVersionUID = 1L; private int numbers[]; private int start, end; private int number; private TaskManager manager; private final static int NOT_FOUND = -1; public SearchNumberTask(int[] numbers, int start, int end, int number, TaskManager manager) { this.numbers = numbers; this.start = start; this.end = end; this.number = number; this.manager = manager; }
@Override protected Integer compute() { System.out.println("Task: " + start + ":" + end); int ret; if (end - start > 10) { ret = launchTasks(); } else { ret = lookForNumber(); } return ret; }
private int lookForNumber() { for (int i = start; i < end; i++) { if (numbers[i] == number) { System.out.printf("Task: Number %d found in position %d\n", number, i); manager.cancelTasks(this); return i; } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } return NOT_FOUND; }
private int launchTasks() { int mid = (start + end) / 2; SearchNumberTask task1 = new SearchNumberTask(numbers, start, mid, number, manager); SearchNumberTask task2 = new SearchNumberTask(numbers, mid, end, number, manager); manager.addTask(task1); manager.addTask(task2); task1.fork(); task2.fork(); int returnValue; returnValue = task1.join(); if (returnValue != NOT_FOUND) { return returnValue; } returnValue = task2.join(); return returnValue; }
public void writeCancelMessage() { System.out.printf("Task: Canceled task from %d to %d\n", start, end); }
控制类 :Main
ArrayGenerator generator = new ArrayGenerator(); int array[] = generator.generateArray(1000); TaskManager manager = new TaskManager(); ForkJoinPool pool = new ForkJoinPool(); SearchNumberTask task = new SearchNumberTask(array, 0, 1000, 5, manager); pool.execute(task); pool.shutdown(); try { pool.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Main: The program has finished\n");
ForkJoinTask 类提供 cancel() 方法使得可以取消一个 尚未执行 的任务。如果任务已经开始执行,则调用cancel()方法没有效果。 此方法接受一个名为 mayInterruptIfRunning 的 Boolean 类型参数。 此变量名可能会让您认为,如果传递 true 到此方法,即使任务在运行也将被取消。 Java API 文档明确提出,在 ForkJoinTask 类的默认实现中,此属性没有效果。任务只在尚未开始执行时才能被取消。 任务被送到池子(不是ForkJoinPool的内部任务队列,而是分配了线程进入线程池)后,对任务的取消就没有效果了,它们将继续执行。
Fork/Join 框架的一个限制是它没有提供取消在 ForkJoinPool 中的所有任务的方法。 为了跳过此限制,实现了 TaskManager 类。它保存所有在 ForkJoinPool 中的所有任务。它提供了一个方法取消所有它保存的任务。 如果一个任务因为它在运行或已经结束而不能被取消时,cancel()方法返回 false,这样您可以尝试取消所有任务而不用担心有什么可能的副作用。
本例中,实现了一个在数字数组里查找一个数字的任务。您按Fork/Join框架推荐的将任务分解成较小的子任务。 您只对数字的出现(而不是出现次数)感兴趣,所有当找到它时,取消其他的任务。