运行多个任务并处理第一个结果

并发编程中一个常见的问题是: 有不同的并发任务解决一个问题时, 只需要那些任务的第一个结果. 例如想排序一个数组, 有不同的排序算法. 您可启动各种算法并获得第一个排好序的结果, 也就是对于一个给定的数组的最快的排序算法.

任务

一个用户可被2种机制校验。其中一种机制校验通过,则此用户被校验通过。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt4.sect05 package中

  • 用户校验类 : UserValidator

    • 属性 name : String 对象
    • validator 方法 : 随机睡上几秒钟后返回随机的boolean值
        public boolean validate(String name, String password) {
            Random random = new Random();
            try {
                long duration = (long) (Math.random() * 10);
                System.out.printf("Validator %s: Validating a user during %d seconds\n", this.name, duration);
                TimeUnit.SECONDS.sleep(duration);
            } catch (InterruptedException e) {
                return false;
            }
            return random.nextBoolean();
        }
    
    • 其余请参考示例代码
  • 校验任务类 : TaskValidator

    • 实现 Callable<String> 接口
    • 属性 validator : UserValidator 对象
    • 属性 user, password: String 对象
    • 覆盖 call() 方法: 如果验证不通过则输出失败信息后抛出异常;否则输出成功信息后返回 validator 的 name 属性
        @Override
        public String call() throws Exception {
            if (!validator.validate(user, password)) {
                System.out.printf("%s: The user has not been found\n", validator.getName());
                throw new Exception("Error validating user");
            }
    
            System.out.printf("%s: The user has been found\n", validator.getName());
            return validator.getName();
        }
    
    • 其余请参考示例代码
  • 控制类 : Main

    • 创建 2 个 UserValidator 对象和对应的 2 个 TaskValidator 对象
        String username = "test";
        String password = "test";
    
        UserValidator ldapValidator = new UserValidator("LDAP");
        UserValidator dbValidator = new UserValidator("DataBase");
    
        TaskValidator ldapTask = new TaskValidator(ldapValidator, username, password);
        TaskValidator dbTask = new TaskValidator(dbValidator, username, password);
    
    • 创建一个 List<TaskValidator> 对象,加入刚创建的 2 个 TaskValidator 对象
        List<TaskValidator> taskList = new ArrayList<>();
        taskList.add(ldapTask);
        taskList.add(dbTask);
    
    • 创建一个 ThreadPoolExecutor 对象
        ExecutorService executor = (ExecutorService) Executors.newCachedThreadPool();
    
    • 调用 executor 的 invokeAny() 方法. 该方法的参数是一个 Callable 对象的集合
        String result;
        try {
            result = executor.invokeAny(taskList);
            System.out.printf("Main: Result: %s\n", result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    
    • 最后用 shutdown() 方法结束执行者
        executor.shutdown();
        System.out.printf("Main: End of the Execution\n");
    

工作原理

本例的关键在Main类中。 ThreadPoolExecutor 类的 invokeAny() 方法接受一个任务列表,启动它们并返回第一个没有抛出异常正常结束的任务的结果. 此方法返回的数据类型和启动的任务的 call() 方法返回的数据类型一致。本例中返回String。

本例有 2 个返回随机 boolean 值的 UserValidator 对象。每个 UserValidator 对象被用于一个 Callable(TaskValidator类)对象。 如果 UserValidator 类的 validate() 方法返回 false, TaskValidator 类抛出异常;否则它返回 true.

这样我们有2个可能返回 true 或 抛出一个 Exception 异常的任务,有以下 4 种可能性:

  • 都返回 true : invokeAny() 方法的结果是首先结束的任务名称。
  • 第一个返回true, 第二个抛出异常 : invokeAny() 方法的结果是第一个任务的名称。
  • 第一个抛出异常,第二个返回true : invokeAny() 方法的结果是第二个任务的名称。
  • 两个任务都抛出异常: invokeAny() 方法抛出一个 ExecutionException 异常。

了解更多

ThreadPoolExecutor 类提供了另一个版本的 invokeAny() 方法:

  • invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): 此方法执行所有任务并返回第一个在给定的时间期限内没有抛出异常、正常结束的任务的结果