使用 Phaser 类来执行并发阶段性任务的能力是Java concurrency API提供的最复杂、强大的功能之一。 此机制在当我们有一些并发任务被分成很多步时有用。Phaser 类使我们可以在每一步骤结束点同步各线程, 所有线程都完成第一步时,才会开始执行第二步。
和其他的同步工具一样,必须用参与同步操作的任务数来初始化 Phaser 类,但是能通过增减来动态修改此数字。
在 3 个不同的文件夹及其子文件夹中查找后缀为 .log 、在最近 24 小时内有修改的文件的 3 个任务,被分成以下 3 步:
在第1步和第2步结束时都检查一下看列表是否为空。如果为空,则结束线程并将此线程从 Phaser 类中去除。
文件搜索线程类 : FileSearch
private String initPath;// 在此文件夹及子文件夹中查找 private String end; // 文件结束字符串(后缀) private List<String> results; private Phaser phaser; // 控制任务不同步骤的同步 public FileSearch(String initPath, String end, Phaser phaser) { this.initPath = initPath; this.end = end; this.phaser = phaser; results = new ArrayList<String>(); }
private void directoryProcess(File file) { File list[] = file.listFiles(); if (list != null) { for (File aList : list) { if (aList.isDirectory()) { directoryProcess(aList); } else { fileProcess(aList); } } } }
private void fileProcess(File file) { // 文件名以end结尾的 if (file.getName().endsWith(end)) { results.add(file.getAbsolutePath()); } }
private void filterResults() { List<String> newResults = new ArrayList<>(); long actualDate = new Date().getTime(); for (String result : results) { File file = new File(result); long fileDate = file.lastModified(); // 最后修改时间在一天内的 if ((actualDate - fileDate) < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)) { newResults.add(result); } } results = newResults; }
private boolean checkResults() { if (results.isEmpty()) { System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase()); System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase()); phaser.arriveAndDeregister(); // 到达此检录点并退出比赛 return false; } else { System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size()); phaser.arriveAndAwaitAdvance(); // 到达此检录点并等待其他参赛者以继续下一阶段比赛 return true; } }
private void showInfo() { for (String result : results) { File file = new File(result); System.out.printf("%s: %s\n", Thread.currentThread().getName(), file.getAbsolutePath()); } phaser.arriveAndAwaitAdvance(); // 到达此检录点并等待其他参赛者以继续下一阶段比赛 }
phaser.arriveAndAwaitAdvance(); System.out.printf("%s: Starting.\n",Thread.currentThread().getName()); File file = new File(initPath); if (file.isDirectory()) { directoryProcess(file); } if (!checkResults()){ return; } filterResults(); if (!checkResults()){ return; } showInfo(); phaser.arriveAndDeregister(); // 已到达终点,退出比赛 System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
控制类 : Main
Phaser phaser = new Phaser(3); FileSearch system = new FileSearch("C:\\Windows", "log", phaser); FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser); FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser); Thread systemThread = new Thread(system, "System"); systemThread.start(); Thread appsThread = new Thread(apps, "Apps"); appsThread.start(); Thread documentsThread = new Thread(documents, "Documents"); documentsThread.start(); try { systemThread.join(); appsThread.join(); documentsThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Terminated: " + phaser.isTerminated());
程序开始创建一个将控制线程在每个阶段结束时的同步的 Phaser 对象。Phaser 的构造函数的参数有参与者数目(本例为3)。 此数字告知 Phaser 在改变阶段和唤醒睡眠中线程前必须执行一次 arriveAndAwaitAdvance() 方法的线程数目。
Phaser一被创建,我们就启动了3个执行3个不同的 FileSearch 对象的线程。
本例使用了Windows操作系统的路径。如果您工作在其他操作系统,请根据您的环境修改路径
FileSearch 对象的run方法中的第一条语句是一个对 Phaser 对象的 arriveAndAwaitAdvance() 方法的调用。 Phaser 知道我们想同步的线程数目。当一个线程调用此方法,Phaser减少必须结束当前阶段的线程数目并将此线程催眠直到所有剩余线程结束当前阶段。 在 run 方法的开始调用此方法,使得所有 FileSearch 线程都被创建了才开始工作。
在阶段1和阶段2, 检查此阶段是否生成结果和结果列表中是否有内容.
在 showInfo() 方法中实现的第3阶段的结尾, 有一个对 phaser 的 arriveAndAwaitAdvance() 方法的调用. 通过此调用, 我们保证所有的线程在同一时间结束. 当此方法结束执行, 有一个对 phaser 的 arriveAndDeregister() 方法的调用. 通过此调用, 我们注销了 phaser 的线程, 这样当所有线程结束, phaser 将没有参与者.
最后, main() 方法等待3个线程的完成, 并调用 phaser 的 isTerminated() 方法. 当 phaser 没有参与者, 它进入结束状态, isTerminated() 方法返回 true. 当我们注销了 phaser 的所有线程, 它将进入结束状态, 此调用将打印 true 到终端.
一个 Phaser 对象有 2 种状态:
Phaser 类的一个显著的特点是您不必控制 phaser 相关方法的异常. 不像其他同步工具,在一个 phaser 中睡眠的线程不会响应中断时间,不抛出 InterruptedException 异常. 只有一个异常,将在后面解释.
Phaser 类提供了其他关于阶段变化的其他方法,如下: