异步运行任务

在 ForkJoinPool 中执行 ForkJoinTask,可以同步或异步方式执行。

  • 同步方式:此方法将任务提交到池后直到任务结束执行了才返回
  • 异步方式:此方法将任务提交到池后立即返回,方法继续执行

两者有一个大的不同:

  • 同步方式:调用同步方法之一(如 invokeAll 方法)的任务被阻塞直到它送到池的任务结束执行。 这允许 ForkJoinPool 类使用工作窃取算法分配一个新任务给被阻塞在睡眠的工作线程。
  • 异步方式:当您使用异步方法(如 fork 方法),任务继续执行,所以 ForkJoinPool 类不能使用 工作窃取算法 来提高应用的性能。 此时,只有当您调用 join() 或 get() 方法来等待一个任务结束时,ForkJoinPool 类才能使用那个算法。

任务

实现一个程序在一个文件夹及其子文件夹中搜索有指定后缀的文件。 您将实现的 ForkJoinTask 类将处理文件夹的内容。对于文件夹的每个子文件夹,它将以异步方式提交一个新任务到 ForkJoinPool 类。 对文件夹中的每个文件,检查文件的后缀,如果匹配则加到结果列表。

实现

本节的示例代码在 com.elanzone.books.noteeg.chpt5.sect04 package中

  • 文件夹处理类 : FolderProcessor : extends RecursiveTask<List<String>>

    • 声明类的序列版本号(因为 RecursiveTask 的父类 ForkJoinTask 实现了 Serializable 接口)
            private static final long serialVersionUID = 1L;
    
    • 属性 path : String 对象,任务将要处理的文件夹的全路径
    • 属性 extension : String 对象,任务将要查找文件的后缀
    • 在构造函数中初始化属性为参数值
            private String path;
            private String extension;
    
            public FolderProcessor(String path, String extension) {
                this.path = path;
                this.extension = extension;
            }
    
    • compute() 方法 (因为 RecursiveTask 的泛型参数为List<String>, 故此方法返回List<String>)
      • 声明一个 List<String> 对象保留查找结果
      • 声明一个 List<FolderProcessor> 对象保存将要处理子文件夹的子任务
      • 获取文件夹的内容逐个遍历
        • 如果是子文件夹,则创建一个新的 FolderProcessor 对象并用 fork() 方法异步执行
        • 否则用 checkFile() 方法比较文件的扩展名与指定的扩展名,如果一致则将其全路径保存到结果字符串列表
      • 如果 FolderProcessor 子任务数超过 50 ,则在终端输出信息表明此事实
      • 调用 addResultsFromTask() 方法将子任务返回的结果加入到结果列表
      • 返回结果
            @Override
            protected List<String> compute() {
                List<String> list = new ArrayList<>();
    
                List<FolderProcessor> tasks = new ArrayList<>();
    
                File file = new File(path);
                File content[] = file.listFiles();
    
                if (content != null) {
                    for (File aContent : content) {
                        if (aContent.isDirectory()) {
                            FolderProcessor task = new FolderProcessor(aContent.getAbsolutePath(), extension);
                            task.fork();
                            tasks.add(task);
                        } else {
                            if (checkFile(aContent.getName())) {
                                list.add(aContent.getAbsolutePath());
                            }
                        }
                    }
                }
                if (tasks.size() > 50) {
                    System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size());
                }
                addResultsFromTasks(list, tasks);
                return list;
            }
    
    • addResultsFromTasks() 方法:对列表中的任务逐个调用 join() 方法等待其结束并返回任务结果,将结果加入到结果列表
            private void addResultsFromTasks(List<String> list,
                                             List<FolderProcessor> tasks) {
                for (FolderProcessor item : tasks) {
                    list.addAll(item.join());
                }
            }
    
    • checkFile() 方法:检查参数是否以指定扩展结尾
            private boolean checkFile(String name) {
                return name.endsWith(extension);
            }
    
  • 控制类 :Main

    • 使用默认构造函数创建一个 ForkJoinPool 对象
    • 创建 3 个 FolderProcessor 任务,用不同的文件夹路径初始它们
            ForkJoinPool pool = new ForkJoinPool();
    
            FolderProcessor system = new FolderProcessor("C:\\Windows", "log");
            FolderProcessor apps = new FolderProcessor("C:\\Program Files", "log");
            FolderProcessor documents = new FolderProcessor("C:\\Documents And Settings", "log");
    
    • 使用 execute() 方法执行 pool 中的3个任务
            pool.execute(system);
            pool.execute(apps);
            pool.execute(documents);
    
    • 每秒输出 pool 信息到终端,直到3个任务结束运行
            do {
                System.out.printf("******************************************\n");
                System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
                System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
                System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
                System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
                System.out.printf("******************************************\n");
    
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            } while ((!system.isDone()) || (!apps.isDone()) || (!documents.isDone()));
    
    • 用 shutdown() 方法停止 ForkJoinPool
            pool.shutdown();
    
    • 将每个任务的结果输出到终端
            List<String> results;
    
            results = system.join();
            System.out.printf("System: %d files found.\n", results.size());
    
            results = apps.join();
            System.out.printf("Apps: %d files found.\n", results.size());
    
            results = documents.join();
            System.out.printf("Documents: %d files found.\n", results.size());
    

讲解

本例的关键在 FolderProcessor 类。每个任务处理一个文件夹的内容。内容有以下2中元素:

  • 文件
  • 文件夹

如果任务找到一个文件夹,它创建另一个 Task 对象来处理那个文件夹并用 fork() 方法将其提交到池子里。 池子如果有空闲的工作线程则直接用,否则创建一个新的线程来执行此任务。fork() 方法立即返回,所以任务能持续处理文件夹的内容。 对每个文件,任务将比较它的后缀是否与要查找的一致,如果一致则将文件名加入到结果列表。

当任务已经处理了被分配的文件夹的所有内容,它用 join() 方法等待所有它提交到池子里的任务结束。 join() 方法等待被调用的任务运行结束,并返回由 compute() 方法的返回值。 任务将所有子任务及自己的结果汇总并将此列表作为 compute() 方法的返回值返回。

ForkJoinPool 类允许任务以异步的方式执行。您用 execute() 方法送了3个任务到池子中。 在 Main 类中,您用 shutdown() 方法结束池子并将在池子中运行的任务的状态情况输出。 ForkJoinPool 类还有更多的方法可用。详见后述。

了解更多

本例使用了 join() 方法等待任务的结束并获得其结果。也可以使用以下2个版本的 get() 方法:

  • get() : 如果 ForkJoinTask 已结束运行则返回 compute() 方法的返回值,否则等待直到它完成
  • get(long timeout, TimeUnit unit): 如果任务的结果尚未出来,则最多等待指定时间。 如果指定时间已过,但结果仍未出来,则返回 null。

get() 和 join() 方法有2个主要区别:

  • join() 方法不能被中断。如果中断调用了 join() 方法的线程,此方法将抛出 InterruptedException 异常。
  • 如果任务抛出任意不受检查的异常
    • get() 方法将抛出 ExecutionException 异常
    • join() 方法将抛出 RuntimeException 异常