使用有延期元素的线程安全列表

Java API提供了一个有趣的数据结构并能用于并发应用,就是 DelayQueue 。 在此类中,能存储有一个激活时间的元素。队列的返回或选取队列元素的方法将忽略激活时间尚未到达(在未来)对元素。 这些元素对那些方法是不可见的。

要有这个效果,保存到 DelayQueue 类的元素必须实现 Delayed 接口。此接口有以下2个方法:

  • compareTo(Delayed o): Delayed 接口扩展 Comparable 接口。
    • 此方法返回负数:表示此对象执行方法的延期事件小于参数的时间
    • 此方法返回正数:表示此对象执行方法的延期事件大于参数的时间
    • 0: 两者相等
  • getDelay(TimeUnit unit): 返回到激活时间的剩余时间。单位为参数指定的单位(如秒、分等)。

任务

使用 DelayQueue 类保存一些有不同激活时间的事件。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt6.sect05 package中

  • 数据类 : Event : 实现 Delayed 接口

    • 属性 startDate : java.util.Date 类型
    • 构造函数中将属性初始化,设置值为参数值
            private Date startDate;
    
            public Event(Date startDate) {
                this.startDate = startDate;
            }
    
    • compareTo() 方法 :比较当前对象的 delay 值和参数传递的值
            @Override
            public long getDelay(TimeUnit unit) {
                Date now = new Date();
                long diff = startDate.getTime() - now.getTime();
                return unit.convert(diff, TimeUnit.MILLISECONDS);
            }
    
    • getDelay() 方法: 以参数指定的单位返回 startDate 与当前时间之间的差别
            @Override
            public long getDelay(TimeUnit unit) {
                Date now = new Date();
                long diff = startDate.getTime() - now.getTime();
                return unit.convert(diff, TimeUnit.MILLISECONDS);
            }
    
  • Runnable 实现类 : Task

    • 属性 id : int : 对应线程的id
    • 属性 queue : DelayQueue<Event> 对象
    • 构造函数中将属性初始化,设置值为参数值
            private int id;
            private DelayQueue<Event> queue;
    
            public Task(int id, DelayQueue<Event> queue) {
                this.id = id;
                this.queue = queue;
            }
    
    • run() 方法 :用 add() 方法向 queue 中加入 100 个启动时间在 线程id 秒后的事件
            @Override
            public void run() {
                Date now = new Date();
                Date delay = new Date();
                delay.setTime(now.getTime() + (id * 1000));
                System.out.printf("Thread %s: %s\n", id, delay);
    
                for (int i = 0; i < 100; i++) {
                    Event event = new Event(delay);
                    queue.add(event);
                }
            }
    
  • 控制类 : Main

    1. 创建一个 DelayQueue 对象,泛型参数为 Event
    2. 创建多个 Task 对象及对应的线程
    3. 启动线程并等待线程的结束
            DelayQueue<Event> queue = new DelayQueue<>();
    
            Thread threads[] = new Thread[5];
            for (int i = 0; i < threads.length; i++) {
                Task task = new Task(i, queue);
                threads[i] = new Thread(task);
            }
    
            for (Thread thread : threads) {
                thread.start();
            }
    
            for (Thread thread : threads) {
                try {
                    thread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
    1. 输出队列信息
      当队列大小大于0,用 poll() 方法从队列中获取事件:
      • 如果返回 null,睡上 500 毫秒等待更多事件的激活
            do {
                int counter = 0;
                Event event;
                do {
                    event = queue.poll();
                    if (event != null) {
                        counter++;
                    }
                } while (event != null);
                System.out.printf("At %s you have read %d events\n", new Date(), counter);
    
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } while (queue.size() > 0);
    

讲解

本节中实现了 Event 类。其有一个属性,事件的激活时间;实现了 Delayed 接口,这样您能将 Event 对象存入 DelayQueue 类。

getDelay() 方法返回激活时间和时间时间区别的纳秒数。 两个时间都是 Date 类对象。您使用了 getTime() 方法返回一个被转换为毫秒的时间,然后把它转换成参数指定的TimeUnit单位。 DelayQueue 类的时间单位是 纳秒,但这对你是透明的。

如果执行此方法的对象的延时小于参数对象的延时,compareTo() 方法返回负值。大于则返回正值,0表示2者相等。

Task类:此类有一个名为 id 的整数属性。 当一个 Task 对象被执行,它在当前实际时间上加上等于task的ID的秒值,作为保存在此任务的DelayQueue中的事件的激活时间。 每个 Task 对象用 add() 方法保存100个事件到队列。

最后,在 Main 类中的 main() 方法中,创建了5个 Task 对象并在对应的线程中执行。 当那些线程结束执行,用 poll() 方法将所有事件输出到终端。poll() 方法获取并移除队列的第一个元素。 如果队列没有任何活跃元素,此方法返回 null。 调用 poll() 方法,如果它返回一个 Event 对象,则计数器加1。当 poll() 方法返回 null,则将计数器的值输出到终端并让线程谁上半秒等待活跃事件。 当获得了保存在队列中的500个事件,程序结束执行。

用到 size() 方法时要非常小心。它返回列表中的元素总数,包含活跃和不活跃的元素。

了解更多

DelayQueue 还有一些其他的方法,如下:

  • clear() : 移除队列中的所有元素
  • offer(E e) : E 代表 DelayQueue 类的泛型参数。此方法将参数 e 插入到队列中。
  • peek() : 获取,但不移除队列的第一个元素
  • take() : 获取并移除队列的第一个元素。如果队列中没有活跃元素,正执行此方法的线程将被阻塞直到有活跃元素。