可使用 CyclicBarrier 类在一个已确定的点内同步2或更多线程。它比较像 CountDownLatch 类,但某些区别使它更强大。
CyclicBarrier 类用一个整数初始化,此整数是在一个确定的点中将被同步的线程数目。 当其中一个线程到达此点时,它调用 await() 方法等待其他线程。CyclicBarrier 类将阻塞催眠此线程直到其他线程到达。 当最后一个线程调用 await() 方法,将唤醒所有在等待中的线程继续执行。
CyclicBarrier 类的一个有趣的优点是您可多传递一个 Runnable 对象作为初始化的参数, 当所有线程到达共同点,CyclicBarrier类作为一个线程来执行此对象。 此特点使得此类能胜任使用分治编程技术的任务的并行。
待查找矩阵 : MatrixMock
private int data[][]; public MatrixMock(int size, int length, int number) { int counter = 0; data = new int[size][length]; Random random = new Random(); for (int i = 0; i < size; i++) { for (int j = 0; j < length; j++) { data[i][j] = random.nextInt(10); if (data[i][j] == number) { counter++; } } } System.out.printf("Mock: There are %d ocurrences of number %d in generated data.\n", counter, number); }
public int[] getRow(int row) { if ((row >= 0) && (row < data.length)) { return data[row]; } return null; }
查找结果类 : Results
private int data[]; public Results(int size) { data = new int[size]; }
public void setData(int position, int value) { data[position] = value; }
public int[] getData() { return data; }
搜索线程类 : Searcher (在随机数矩阵的某些行中搜索某数字)
private int firstRow; private int lastRow; private MatrixMock mock; private Results results; private int number; private final CyclicBarrier barrier; public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier) { this.firstRow = firstRow; this.lastRow = lastRow; this.mock = mock; this.results = results; this.number = number; this.barrier = barrier; }
int counter; System.out.printf("%s: Processing lines from %d to %d.\n", Thread.currentThread().getName(), firstRow, lastRow); for (int i = firstRow; i < lastRow; i++) { int row[] = mock.getRow(i); counter = 0; for (int aRow : row) { if (aRow == number) { counter++; } } results.setData(i, counter); } System.out.printf("%s: Lines processed.\n", Thread.currentThread().getName()); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); }
结果汇总线程 : Grouper
private Results results; public Grouper(Results results) { this.results = results; }
int finalResult = 0; System.out.printf("Grouper: Processing results...\n"); int data[] = results.getData(); for (int number : data) { finalResult += number; } System.out.printf("Grouper: Total result: %d.\n", finalResult);
控制类 : Main
final int ROWS = 10000; final int NUMBERS = 1000; final int SEARCH = 5; final int PARTICIPANTS = 5; final int LINES_PARTICIPANT = 2000; MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEARCH); Results results = new Results(ROWS); Grouper grouper = new Grouper(results); CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper); Searcher searchers[] = new Searcher[PARTICIPANTS]; for (int i = 0; i < PARTICIPANTS; i++) { int firstRow = i * LINES_PARTICIPANT; int lastRow = firstRow + LINES_PARTICIPANT; searchers[i] = new Searcher(firstRow, lastRow, mock, results, 5, barrier); Thread thread = new Thread(searchers[i]); thread.start(); } System.out.printf("Main: The main thread has finished.\n");
在本例中,希望知道一个随机数大矩阵中某数字出现的总次数。 为了有更好的性能,使用了分治技术,将矩阵分为5个子集,一个线程在一个子集中查找。这些线程是 Searcher 对象。
本例中使用了一个 CyclicBarrier 对象来同步 5 个线程的完成并执行 Grouper 任务来汇总各部分的结果得到最终结果。
CyclicBarrier 类有一个内部计数器来控制有多少个线程必须到达此同步点。 每当一个线程到达同步点,它调用 await() 方法告知 CyclicBarrier 对象;CyclicBarrier 将此线程催眠直到所有线程都到达同步点。
当所有线程都到达同步点,CyclicBarrier 对象唤醒所有在 await() 方法中等待的所有线程; 如果在构造函数传递了 Runnable 对象,则创建一个新线程执行以执行额外的任务。
CyclicBarrier 有另一个版本的 await() 方法