Java Fork Join 框架

对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。

CompletableFuture

runAsync 和 supplyAsync 方法

CompletableFuture 提供了四个静态方法来创建一个异步操作。

1
2
3
4
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定 Executor 的方法会使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync 方法不支持返回值。
  • supplyAsync 可以支持返回值。

CompletionService

CompletionService 是 Java 并发包(java.util.concurrent)中的一个接口,主要用于 批量提交任务后按完成顺序获取结果
常见实现类是 ExecutorCompletionService

✅ 适用场景:当你提交多个异步任务,希望谁先完成先处理谁,而不是按提交顺序等待结果。

主要组成

CompletionService 结合了两个组件:

  1. Executor —— 用于执行任务(通常是 ThreadPoolExecutor)。
  2. BlockingQueue —— 存放已完成任务的结果,方便按完成顺序获取。

常用方法

方法 作用
submit(Callable<V> task) 提交任务,返回一个 Future
take() 阻塞等待并获取下一个完成的任务
poll() 非阻塞地获取下一个完成的任务(可能返回 null)

简单示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.*;

public class CompletionServiceDemo {
public static void main(String[] args) throws Exception {
// 1. 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);

// 2. 创建 CompletionService
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);

// 3. 提交多个任务
for (int i = 1; i <= 5; i++) {
int taskId = i;
completionService.submit(() -> {
// 模拟不同任务耗时
long sleepTime = (long) (Math.random() * 3000);
Thread.sleep(sleepTime);
return "任务 " + taskId + " 完成,耗时 " + sleepTime + " ms";
});
}

// 4. 按完成顺序获取结果
for (int i = 0; i < 5; i++) {
Future<String> future = completionService.take(); // 阻塞直到有任务完成
System.out.println(future.get());
}

// 5. 关闭线程池
executor.shutdown();
}
}

🧾 输出示例(随机顺序):

1
2
3
4
5
任务 3 完成,耗时 420 ms
任务 1 完成,耗时 880 ms
任务 5 完成,耗时 1320 ms
任务 2 完成,耗时 1900 ms
任务 4 完成,耗时 2780 ms

🚀总结要点

  • ExecutorService 按提交顺序取结果;
  • CompletionService 按完成顺序取结果;
  • 内部用一个阻塞队列保存已完成任务;
  • 常用于批量爬虫、批量请求、数据聚合等需要快速响应的场景。

是否希望我帮你把这份笔记整理成 Markdown 格式(带高亮与小标题,方便你直接放进笔记软件或博客)?

Fork/Join

概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型
运算。
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计
算,如归并排序、斐波那契数列、都可以用分治思想进行求解。

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运
算效率

Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下
面定义了一个对 1~n 之间的整数求和的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask < Integer > {

int n;

public AddTask1(int n) {
this.n = n;
}

@Override
public String toString() {
return "{" + n + '}';
}

@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
log.debug("join() {}", n);
return n;
}

// 将任务进行拆分(fork)
AddTask1 t1 = new AddTask1(n - 1);
t1.fork();
log.debug("fork() {} + {}", n, t1);

// 合并(join)结果
int result = n + t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}

然后提交给 ForkJoinPool 来执行

1
2
3
4
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask1(5)));
}

结果:

1
2
3
4
5
6
7
8
9
10
[ForkJoinPool-1-worker-0] - fork() 2 + {1} 
[ForkJoinPool-1-worker-1] - fork() 5 + {4}
[ForkJoinPool-1-worker-0] - join() 1
[ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
[ForkJoinPool-1-worker-2] - fork() 4 + {3}
[ForkJoinPool-1-worker-3] - fork() 3 + {2}
[ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
[ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
[ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
15