等待多个并发事件

Java concurrency API提供了一个类允许一个或多个线程等待直到一系列操作执行完。它就是 CountDownLatch。 此类用一个整数初始化,该整数是线程将等待的操作数目。当一个线程想等待这些操作执行完,它调用 CountDownLatch 的 await 方法。 await 方法让线程睡到操作执行完。当其中一个操作执行完,使用 countDown() 方法减少 CountDownLatch 类的内部计数器。 当减到0,CountDownLatch 类将唤醒在 await() 方法中睡眠的所有线程。

任务

使用 CountDownLatch 类实现一个视频会议系统。该系统将等待直到所有与会人都到齐了才开始。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt3.sect04 package中

  • 视频会议线程 : VideoConference

    • 声明 CountDownLatch 对象 controller 并在构造函数中传递整型参数 number 以初始化 controller
      VideoConference 类将等待参数 number 个与会者
        private final CountDownLatch controller;
    
        public VideoConference(int number) {
            controller = new CountDownLatch(number);
        }
    
    • 实现 arrive() 方法。此方法将在每个与会者到达视频会议时调用。它有一个String类型参数name。
      1. 输出到达者的信息
      2. 调用 CountDownLatch 对象 controller 的 countDown() 方法
      3. 调用 CountDownLatch 对象 controller 的 getCount() 方法获得还需要等待多少人的信息
        public void arrive(String name) {
            System.out.printf("%s has arrived.\n", name);
            controller.countDown();
            System.out.printf("VideoConference: Waiting for %d participants.\n", controller.getCount());
        }
    
    • run 方法
      1. 使用 getCount() 方法获知与会者人数信息
      2. 使用 await() 方法等待所有与会者(在代码中需要捕捉InterruptedException异常)
      3. 输出信息表示所有与会者都到达、可以开始会议了
            System.out.printf("VideoConference: Initialization: %d participants.\n", controller.getCount());
    
            try {
                controller.await();
    
                System.out.printf("VideoConference: All the participants have come.\n");
                System.out.printf("VideoConference: Let's start...\n");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
  • 与会者线程 : Participant

    • 声明 VideoConference 对象属性 conference 表示参加的会议
    • 声明 String 对象属性 name 表示与会者姓名
    • 在构造函数中初始化这2个属性
        private VideoConference conference;
        private String name;
    
        public Participant(VideoConference conference, String name) {
            this.conference = conference;
            this.name = name;
        }
    
    • run 方法
      1. 睡上一段时间
      2. 调用 VideoConference 对象的 arrive() 方法表示此与会人已到达
            long duration = (long) (Math.random() * 10);
            System.out.printf("%s: wait me for %d seconds\n", name, duration);
    
            try {
                TimeUnit.SECONDS.sleep(duration);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            conference.arrive(name);
    
  • 控制类 : Main

    • 创建一个 VideoConference 对象 conference 等待 10 个与会者
        VideoConference conference = new VideoConference(10);
    
    • 创建运行此 VideoConference 对象的线程并启动它
        Thread threadConference = new Thread(conference);
        threadConference.start();
    
    • 创建 10 个 Participant 对象,每个对象对应一个线程,启动所有线程
        for (int i = 0; i < 10; i++) {
            Participant p = new Participant(conference, "Participant" + i);
            Thread t = new Thread(p);
            t.start();
        }
    

 工作原理

CountDownLatch 类有 3 个基本元素:

  • 决定 CountDownLatch 类等待多少个事件的初始值
  • await() 方法:由等待所有事件完成的所有线程调用
  • countDown() 方法: 由执行完任务的事件调用
  1. 当创建一个 CountDownLatch 对象时,该对象使用构造函数的参数来初始化一个内部计数器。
  2. 每次一个线程调用 countDown() 方法时,CountDownLatch 对象减少内部计数器一个单位。
  3. 当内部计数器到0时,CountDownLatch 对象唤醒所有在 await() 方法中等待的线程。

不能重新初始化或修改 CountDownLatch 对象的内部计数器。当计数器被初始化,唯一能修改它的方法是 countDown() 方法。 当计数器到0,所有对 await() 方法的调用都立即返回,后续对 countDown() 方法的调用也不再有效。

和其他的同步机制相比,CountDownLatch 有以下区别:

  • CountDownLatch 机制不是被用来保护一个共享资源或临界区的。它被用来同步一个或多个有不同任务的线程
  • 它是一次性的。计数器一到0,后续的所有调用都无效了。如果要再次做同样的同步,需要创建一个新的对象。

了解更多

CountDownLatch 类有另一个版本的 await() 方法,如下:

  • await(long time, TimeUnit unit)
    线程将睡眠直到:
    • 它被中断
    • CountDownLatch的内部计数器到0
    • 过去指定的时间