使用阻塞的线程安全列表

最基本的集合是列表。一个列表有不固定数量的元素,能增加、读或移除任意位置的元素。 并发列表允许多个线程依次增加、移除列表中的元素,不会产生任何数据的不一致。

本节介绍如何使用在并发程序中使用 阻塞 列表。 阻塞列表和非阻塞列表的主要区别在于列表的添加、移除数据操作,如果不能立即执行(如因为列表满了或为空),调用的线程被阻塞直到操作能执行。 Java 提供了 LinkedBlockingDeque 类实现一个阻塞列表。

任务

实现一个有以下2个不同任务的例子:

  • 一个往一个列表大量添加数据
  • 一个从同一个列表大量移除数据

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt6.sect03 package中

  • Runnable 实现类 :Client

    • 属性 requestList : LinkedBlockingDeque<String> 对象
    • 在构造函数中将属性值初始化为参数值
        private LinkedBlockingDeque<String> requestList;
    
        public Client(LinkedBlockingDeque<String> requestList) {
            this.requestList = requestList;
        }
    
    • run() 方法 : 使用 put() 方法每秒插入 5 个字符串对象到 requestList,重复 3 次
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                for (int j = 0; j < 5; j++) {
                    String request = "" + i + ":" + j;
                    try {
                        requestList.put(request);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("Client: %s at %s.\n", request, new Date());
                }
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.printf("Client: End.\n");
        }
    
  • 控制类 : Main

    1. 创建一个 LinkedBlockingDeque 对象,泛型参数为 String 类。 列表大小为3
        LinkedBlockingDeque<String> list=new LinkedBlockingDeque<>(3);
    
    1. 创建一个 Client 对象及对应的线程,并启动线程
        Client client = new Client(list);
        Thread thread = new Thread(client);
        thread.start();
    
    1. 用 take() 方法每 300 毫秒从 list 对象中取 3 个字符串,重复 5 次
        for (int i = 0; i < 5; i++) {
            for (int j = 0; j < 3; j++) {
                String request = null;
                try {
                    request = list.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    continue;
                }
                System.out.printf("Main: Request: %s at %s. Size: %d\n", request, new Date(), list.size());
            }
    
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        System.out.printf("Main: End of the program.\n");
    

讲解

本节中使用了 LinkedBlockingDeque (泛型参数为 String 类) 来实现一个阻塞并发数据列表。

Client 类使用 put() 方法插入数据到列表中。如果列表满了(因为是用固定大小创建的),此方法将阻塞线程的执行直到列表中有空间。

Main 类使用 take() 方法从列表中获取字符串。如果列表为空,此方法将阻塞线程的执行直到列表中有元素。

LinkedBlockingDeque 类的这2个方法在阻塞时被中断,都会抛出 InterruptedException 异常,所有您必须包含必要的代码来捕捉异常。

了解更多

LinkedBlockingDeque 类也提供其他的方法从列表里获取、设置元素时。如下:

  • takeFirst() 和 takeLast():分别返回列表的第一个和最后一个元素。
    • 将元素从列表中移除
    • 如果列表为空,阻塞线程直到列表中有元素
  • getFirst() 和 getLast(): 分别返回列表的第一个和最后一个元素。
    • 不从列表移除元素
    • 如果列表为空,抛出 NoSuchElementException 异常
  • peek(), peekFirst() 和 peekLast(): 分别返回列表的第一个和最后一个元素。
    • 不从列表移除元素
    • 如果列表为空,返回 null
  • poll(), pollFirst() 和 pollLast(): 分别返回列表的第一个和最后一个元素。
    • 将元素从列表中移除
    • 如果列表为空,返回 null
  • add(), addFirst(), addLast(): 分别添加一个元素到列表的第一个和最后一个位置。
    • 如果列表满了,抛出 IllegalStateException 异常