Fork/Join 框架提供了执行返回结果的任务的能力。此类任务由 RecursiveTask 类实现。 此类扩展 ForkJoinTask 并实现了 Executor 框架提供的 Future 接口。
在任务中,您必须使用 Java API 文档推荐的结构:
If (problem size > size){ tasks=Divide(task); execute(tasks); groupResults() return result; } else { resolve problem; return result; }
开发一个应用程序在一份文档里查找一个单词。实现以下两种任务:
所有任务将返回在它们处理部分的文档或行中指定单词的出现次数。
本节的示例代码在 com.elanzone.books.noteeg.chpt5.sect03 package中
文档类 : Document ,将生成一个字符串矩阵模拟一份文档
private String words[] = { "the", "hello", "goodbye", "package", "java", "thread", "pool", "random", "class", "main" };
public String[][] generateDocument(int numLines, int numWords, String word) { int counter = 0; String document[][] = new String[numLines][numWords]; Random random = new Random(); for (int i = 0; i < numLines; i++) { for (int j = 0; j < numWords; j++) { int index = random.nextInt(words.length); document[i][j] = words[index]; if (document[i][j].equals(word)) { counter++; } } } System.out.println("DocumentMock: The word appears " + counter + " times in the document"); return document; }
行处理任务类 : LineTask
public class LineTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 2L;
private String line[]; private int start, end; private String word; public LineTask(String[] line, int start, int end, String word) { this.line = line; this.start = start; this.end = end; this.word = word; }
@Override protected Integer compute() { Integer result = null; if (end - start < 100) { result = count(line, start, end, word); } else { int mid = (start + end) / 2; LineTask task1 = new LineTask(line, start, mid, word); LineTask task2 = new LineTask(line, mid, end, word); invokeAll(task1, task2); try { result = groupResults(task1.get(), task2.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return result; }
private Integer count(String[] line, int start, int end, String word) { int counter; counter = 0; for (int i = start; i < end; i++) { if (line[i].equals(word)) { counter++; } } try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } return counter; }
文档处理任务类 : DocumentTask
public class DocumentTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
private String line[]; private int start, end; private String word; public LineTask(String[] line, int start, int end, String word) { this.line = line; this.start = start; this.end = end; this.word = word; }
@Override protected Integer compute() { int result = 0; if (end - start < 10) { result = processLines(document, start, end, word); } else { int mid = (start + end) / 2; DocumentTask task1 = new DocumentTask(document, start, mid, word); DocumentTask task2 = new DocumentTask(document, mid, end, word); invokeAll(task1, task2); try { result = groupResults(task1.get(), task2.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return result; }
private Integer processLines(String[][] document, int start, int end, String word) { List<LineTask> tasks = new ArrayList<LineTask>(); for (int i = start; i < end; i++) { LineTask task = new LineTask(document[i], 0, document[i].length, word); tasks.add(task); } invokeAll(tasks); int result = 0; for (LineTask task : tasks) { try { result = result + task.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return result; }
控制类 : Main
Document mock = new Document(); String[][] document = mock.generateDocument(100, 1000, "the");
DocumentTask task = new DocumentTask(document, 0, 100, "the");
ForkJoinPool pool = new ForkJoinPool(); pool.execute(task);
do { System.out.printf("******************************************\n"); System.out.printf("Main: Parallelism: %d\n", pool.getParallelism()); System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount()); System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount()); System.out.printf("Main: Steal Count: %d\n", pool.getStealCount()); System.out.printf("******************************************\n"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone());
pool.shutdown();
try { pool.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
try { System.out.printf("Main: The word appears %d in the document", task.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
在本例中实现了2个不同的任务:
在 Main 类中,使用默认的构造函数创建了一个 ForkJoinPool 对象, 并在其中运行一个 DocumentTask 任务处理一个有 100 行、每行 1000 个单词的文档。 此任务将用其他的 DocumentTask 对象和 LineTask 对象将此问题分解, 当所有任务运行完,可用原始任务获得单词在整个文档中出现的总次数。 因为这些任务返回结果,它们扩展 RecursiveTask 类。
为了获得任务返回的结果,使用了 get() 方法。此方法在 Future 接口中声明,在 RecursiveTask 类中实现。