使用非阻塞线程安全列表

最基本的集合是列表。一个列表有不固定数量的元素,能增加、读或移除任意位置的元素。 并发列表允许多个线程依次增加、移除列表中的元素,不会产生任何数据的不一致。

本节介绍如何使用在并发程序中使用非阻塞列表。 非阻塞列表提供的操作如果不能立即完成(如果想从一个空列表中获取数据),则根据操作类型返回null或抛出异常。 Java 7 提供了 ConcurrentLinkedDeque 类实现一个非阻塞并发列表。

任务

实现一个有以下2个不同任务的例子:

  • 一个往一个列表大量添加数据
  • 一个从同一个列表大量移除数据

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt6.sect02 package中

  • Runnable 实现类: AddTask

    • 属性 list : ConcurrentLinkedDeque<String> 对象
    • 构造函数将list属性初始为参数值
            private ConcurrentLinkedDeque<String> list;
    
            public AddTask(ConcurrentLinkedDeque<String> list) {
                this.list = list;
            }
    
    • run() 方法:存储10000个字符串(线程名加序号)到列表
            @Override
            public void run() {
                String name = Thread.currentThread().getName();
                for (int i = 0; i < 10000; i++) {
                    list.add(name + ": Element " + i);
                }
            }
    
  • Runnable 实现类:PollTask

    • 属性 list : ConcurrentLinkedDeque<String> 对象
    • 构造函数将list属性初始为参数值
        private ConcurrentLinkedDeque<String> list;
    
        public PollTask(ConcurrentLinkedDeque<String> list) {
            this.list = list;
        }
    
    • run() 方法:将10000个元素从list中分5000次拿出,每次从2端各拿1个
        @Override
        public void run() {
            for (int i = 0; i < 5000; i++) {
                list.pollFirst();
                list.pollLast();
            }
        }
    
  • 控制类 : Main

    1. 创建一个 ConcurrentLinkedDeque 对象,泛型参数为 String
    2. 创建一个有 100 个 线程对象的数组
        ConcurrentLinkedDeque<String> list = new ConcurrentLinkedDeque<>();
        Thread threads[] = new Thread[100];
    
    1. 创建 100 个 AddTask 对象及对应执行的线程,启动线程后等待线程结束
        for (int i=0; i<100; i++) {
            AddTask task = new AddTask(list);
            threads[i] = new Thread(task);
            threads[i].start();
        }
        System.out.printf("Main: %d AddTask threads have been launched\n",threads.length);
    
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.printf("Main: Size of the List: %d\n",list.size());
    
    1. 创建 100 个 PollTask 对象及对应执行的线程,启动线程后等待线程结束
        for (int i=0; i< threads.length; i++){
            PollTask task=new PollTask(list);
            threads[i]=new Thread(task);
            threads[i].start();
        }
        System.out.printf("Main: %d PollTask threads have been launched\n",threads.length);
    
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.printf("Main: Size of the List: %d\n",list.size());
    

讲解

本节中使用了 ConcurrentLinkedDeque 对象(泛型参数为 String 类)来实现一个非阻塞并发数据列表。

首先执行了 100 个 AddTask 添加元素到列表,每个使用 add() 方法插入 10000 个元素到列表。此方法在列表末尾添加元素。 当所有任务完成后,列表有 100 万个元素。

然后执行了 100 个 PollTask 从列表中移除元素。每个任务用 pollFirst() 和 pollLast() 方法移除列表的 10000 个元素。 pollFirst() 方法返回并移除列表的第1个元素;pollLast() 方法返回并移除列表的最后一个元素。如果列表为空,这2个方法返回 null。 当所有任务结束后,列表中有0个元素。

这里用了 size() 方法来获取列表中的元素个数。小心此方法返回的值可能不是真实值,特别是当有线程在列表里添加、删除数据时使用它。 此方法遍历整个列表来统计元素个数,列表内容在此操作过程中可能会变化。 只有在没有任何线程在修改列表时使用它,才能保证返回的结果是正确的。

了解更多

ConcurrentLinkedDeque 类还提供了一些方法来从列表中获取元素:

  • getFirst() 和 getLast():返回列表的第一个和最后一个元素。
    • 不从列表移除元素
    • 如果列表是空的,抛出一个 NoSuchElementExcpetion 异常。
  • peek(), peekFirst() 和 peekLast(): 返回列表的第一个和最后一个元素。
    • 不从列表移除元素
    • 如果列表为空,返回 null
  • remove(), removeFirst(), removeLast(): 返回列表的第一个和最后一个元素。
    • 元素将从列表中被移除
    • 如果列表是空的,抛出一个 NoSuchElementExcpetion 异常。