Java 8 的 Stream API 堪称函数式编程的一大利器,但它有一个被无数开发者吐槽的痛点:想自定义一个复杂的中间操作,怎么就这么难?灵魂拷问来了:你是不是也曾在 filter、map、sorted 这些内置操作之外,为了一个特定逻辑而绞尽脑汁,最后只能用终端操作 Collector 来曲线救国,或者写出一长串既臃肿又难维护的代码?
好消息是,这个困扰我们多年的空白,终于被 Java 24 引入的 Stream Gatherers 填补了!
Gatherers 是什么?
官方给出的定义非常直接:Gatherer 是一种中间操作,能够将输入流转换为输出流,并且可以在流结束时执行最终操作。
简单来说:
Collector 是自定义终端操作的工具。
Gatherer 就是自定义中间操作的工具。
有了它,我们终于可以像定义 Collector 那样,自由地定义各种中间转换逻辑。它强大到支持:
- 一对多、多对一、多对多的元素转换。
- 跟踪已处理的元素状态,并影响后续的转换逻辑。
- 原生支持并行处理流数据。
告别传统痛点,感受 Gatherers 的简洁
让我们通过一个具体场景来感受它的威力:将序列 [0,1,...,9] 按固定大小 3 进行分组(窗口),并且最多只取前 2 组,期望输出 [[0,1,2], [3,4,5]]。
传统 Stream API 的实现
IntStream.range(0, 10)
.boxed()
.limit(3 * 2)
.collect(Collector.of(
() -> new ArrayList<ArrayList<Integer>>(),
(groups, element) -> {
if (groups.isEmpty() || groups.getLast().size() == 3) {
var current = new ArrayList<Integer>();
current.add(element);
groups.addLast(current);
} else {
groups.getLast().add(element);
}
},
(left, right) -> {
throw new UnsupportedOperationException("无法并行化");
}
))
.forEach(System.out::println);
这段代码不仅冗长,而且为了实现一个简单的窗口功能,我们不得不手动维护一个状态(groups),并行支持也成了问题,可读性和可维护性都不高。
使用 Stream Gatherers 实现
IntStream.range(0, 10)
.boxed()
.gather(Gatherers.windowFixed(3)) // 固定窗口聚合
.limit(2)
.forEach(System.out::println);
看到区别了吗?gather(Gatherers.windowFixed(3)) 仅仅一行,就优雅地解决了所有问题。逻辑清晰,意图明确,这才是我们想要的流式编程体验!
Gatherers 全家福:覆盖 80% 的常用场景
Java 贴心地为我们内置了一批开箱即用的 Gatherers,足以应对大多数常见需求。
1. 固定窗口:windowFixed
按指定大小切割流,保持元素顺序,最后一个窗口可能不满。
// 输入:0,1,2,3,4,5,6,7,8,9
.stream()
.gather(Gatherers.windowFixed(3))
// 输出:[0,1,2], [3,4,5], [6,7,8], [9]
2. 滑动窗口:windowSliding
窗口每次滑动一个元素,每个新窗口包含前一个窗口的后 n-1 个元素。
// 输入:0,1,2,3,4,5
.stream()
.gather(Gatherers.windowSliding(3))
// 输出:[0,1,2], [1,2,3], [2,3,4], [3,4,5]
3. 折叠聚合:fold
类似于终端操作 reduce,但 fold 是中间操作,聚合后流不会关闭,可以继续处理。
List.of("a", "b", "c", "d", "e")
.stream()
.gather(Gatherers.fold(() -> "", String::concat))
.map(String::toUpperCase)
.forEach(System.out::println); // 输出:ABCDE
4. 前缀扫描:scan
与 fold 类似,但 scan 会输出每一步的中间聚合结果,非常适合需要追踪过程数据的自定义中间操作。
List.of("a", "b", "c", "d", "e")
.stream()
.gather(Gatherers.scan(() -> "", String::concat))
.map(String::toUpperCase)
.forEach(System.out::println); // 输出:A, AB, ABC, ABCD, ABCDE
5. 并发映射:mapConcurrent
利用虚拟线程并发执行 map 操作,专为 I/O 密集型场景(如网络请求、文件读写)优化,能显著提升吞吐量。
.stream()
.gather(Gatherers.mapConcurrent(5, element -> {
// I/O 操作:比如接口调用、文件读取
return process(element);
}))
自定义 Gatherers:应对终极复杂场景
当内置 Gatherers 无法满足你的独特需求时,终极武器——自定义 Gatherer 就派上用场了。例如,实现一个“只保留递增元素”的 Gatherer:
// 自定义:只保留比上一个元素大的数
Gatherer<Integer, int[], Integer> INCREASING_ONLY =
Gatherer.ofSequential(
() -> new int[1], // 状态:记录上一个元素
(state, input, downstream) -> {
if (input > state[0]) {
state[0] = input; // 更新状态
downstream.push(input); // 发送给下游
}
return true; // 继续处理
}
);
// 使用自定义 Gatherer
List.of(1,3,2,5,4,6,8,7,9)
.stream()
.gather(INCREASING_ONLY)
.forEach(System.out::println); // 输出:1,3,5,6,8,9
重要提示:大多数需求都可以通过组合现有Stream API中间操作或使用内置Gatherers实现。自定义 Gatherer 是最后的手段,请优先考虑更简单的方案。
传统 Stream API VS Stream Gatherers
| 特性 |
传统 Stream API |
Stream Gatherers |
| 自定义中间操作 |
❌ 不支持,需终端操作兜底 |
✅ 原生支持,可组合 |
| 窗口处理 |
❌ 需手动 collect 实现,复杂 |
✅ 内置 windowFixed/windowSliding |
| 中间聚合结果 |
❌ 只能终端操作获取最终结果 |
✅ scan 支持输出每步聚合结果 |
| 并发映射(I/O 场景) |
❌ parallelStream 依赖 ForkJoinPool |
✅ mapConcurrent 用虚拟线程,高效 |
| 代码简洁度 |
❌ 复杂场景冗长 |
✅ 一行搞定复杂转换 |
| 并行处理兼容性 |
❌ 部分手动实现不支持并行 |
✅ 原生支持并行,可指定特性 |
总结
Stream Gatherers 远不止是 API 的简单补充,它是让 Java Stream API 完成从“半灵活”到“全灵活”蜕变的关键升级。它彻底打破了传统流式 API 在有状态操作和复杂输入输出映射上的局限性,让我们的代码能够更加简洁、优雅且易于维护。
- 日常开发:用内置的 Gatherers 替代那些繁琐的
collect 逻辑。
- 复杂场景:通过自定义 Gatherer 实现专属的、可复用的中间操作。
- I/O 密集型任务:利用
mapConcurrent 充分发挥虚拟线程的威力,提升程序效率。
可以说,掌握了 Stream Gatherers,你才真正掌握了 Java 流式编程的完整武器库。这项特性正在引领 Java 函数式编程进入一个更强大、更自由的新阶段。如果你想了解更多关于Java新特性的深度解析和实践技巧,欢迎到云栈社区与更多开发者一起交流探讨。