Java Fork Join 框架
对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。
CompletableFuture
runAsync 和 supplyAsync 方法
CompletableFuture 提供了四个静态方法来创建一个异步操作。
1 2 3 4
| public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
|
没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。
- runAsync 方法不支持返回值。
- supplyAsync 可以支持返回值。
Fork/Join
概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型
运算。
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计
算,如归并排序、斐波那契数列、都可以用分治思想进行求解。
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运
算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下
面定义了一个对 1~n 之间的整数求和的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Slf4j(topic = "c.AddTask") class AddTask1 extends RecursiveTask < Integer > {
int n;
public AddTask1(int n) { this.n = n; }
@Override public String toString() { return "{" + n + '}'; }
@Override protected Integer compute() { if (n == 1) { log.debug("join() {}", n); return n; }
AddTask1 t1 = new AddTask1(n - 1); t1.fork(); log.debug("fork() {} + {}", n, t1);
int result = n + t1.join(); log.debug("join() {} + {} = {}", n, t1, result); return result; } }
|
然后提交给 ForkJoinPool 来执行
1 2 3 4
| public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(4); System.out.println(pool.invoke(new AddTask1(5))); }
|
结果:
1 2 3 4 5 6 7 8 9 10
| [ForkJoinPool-1-worker-0] - fork() 2 + {1} [ForkJoinPool-1-worker-1] - fork() 5 + {4} [ForkJoinPool-1-worker-0] - join() 1 [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3 [ForkJoinPool-1-worker-2] - fork() 4 + {3} [ForkJoinPool-1-worker-3] - fork() 3 + {2} [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6 [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10 [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15 15
|