一、概述
在 Apache Flink 中,ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用,以实现窗口的增量聚合。这种组合模式允许数据在到达窗口时进行实时计算,当窗口关闭时,ProcessWindowFunction 将收到最终的聚合结果。如此一来,我们既能享受增量聚合带来的效率优势,又能通过 ProcessWindowFunction 获取窗口的元数据信息,为更复杂的业务逻辑处理提供了可能。
二、需求
我们将创建一个模拟数据源,持续生成订单数据。随后,基于处理时间,以5秒为间隔划分滚动窗口,并按照客户名(cusname)进行分组。在每个窗口内,我们需要对订单价格进行求和。最终输出的结果应包含:窗口的开始时间、窗口的结束时间、客户名以及该窗口内的累计价格。ProcessWindowFunction 在这里的关键作用,就是让我们能够访问到窗口的上下文信息,从而获取时间范围。
三、示例
1、数据来源
首先定义订单实体类和数据生成逻辑。
public class Orders {
private Date addtime;
private String cusname;
private BigDecimal price;
private int status;
// 省略 getter/setter 方法
}
public class OrdersSourceObject implements GeneratorFunction<Long, Orders> {
public OrdersSourceObject(){}
@Override
public Orders map(Long value) throws Exception {
Orders order = new Orders();
int i = new Random().nextInt(100);
double min = 1.0;
double max = 10.0;
Random random = new Random();
double randomNum = min + (max - min) * random.nextDouble();
order.setStatus(1);
order.setCusname("name"+i );
order.setAddtime(new Date());
order.setPrice(new BigDecimal(randomNum).setScale(2,BigDecimal.ROUND_HALF_UP));
return order;
}
}
接着,使用 Flink 的 DataGeneratorSource 来包装我们的生成器,设定生成速率。
DataGeneratorSource<Orders> dataGeneratorSource = new DataGeneratorSource<>(
new OrdersSourceObject(),
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(1),
TypeInformation.of(Orders.class)
);
上述配置表示每秒生成1条订单记录,其中价格(price)是随机生成的小于10的两位小数。这种模拟数据源在测试流处理逻辑时非常有用,是 大数据 开发中的常见实践。
2、开窗并执行Reduce
核心的流处理逻辑如下:我们按cusname分组,开一个5秒的滚动处理时间窗口,然后应用reduce函数。reduce接收两个参数:第一个是执行增量聚合的ReduceFunction,第二个是处理窗口结果的ProcessWindowFunction。
streamSource
.keyBy(t -> t.getCusname())
.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
.reduce(new ReduceFunction<Orders>() {
@Override
public Orders reduce(Orders orders, Orders t1) throws Exception {
Orders order = new Orders();
order.setCusname(t1.getCusname());
order.setAddtime(t1.getAddtime());
order.setStatus(t1.getStatus());
order.setPrice(t1.getPrice().add(orders.getPrice()));
return order;
}
}, new ProcessWindowFunction<Orders, Tuple4<String, String, String, BigDecimal>, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Orders> elements, Collector<Tuple4<String, String, String, BigDecimal>> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String windowEnd = DateFormatUtils.format(end, “yyyy-MM-dd HH:mm:ss.SSS");
for (Orders order : elements) {
out.collect(Tuple4.of(windowStart, windowEnd, s, order.getPrice()));
}
}
}).print();
四、分析
让我们逐步拆解上述代码的执行过程:
- 按键分组(KeyBy):数据流按照
cusname 字段的值进行分组,确保相同客户的订单被分配到同一个逻辑流中进行处理。
- 开窗(Window):
TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)) 定义了窗口策略。这是一个基于处理时间的滚动窗口,每个窗口的固定长度是5秒,窗口之间没有重叠。
- 双参数Reduce:
reduce 方法接收了两个函数。
- 第一个是
ReduceFunction:它负责增量聚合。每当一条新数据进入当前窗口,Flink 就会调用此函数,将新数据(t1)与已有的聚合结果(orders)进行合并。在本例中,合并操作就是价格的累加。
- 第二个是
ProcessWindowFunction:它负责在窗口触发时被调用。此时,ReduceFunction 已经完成了窗口内所有数据的聚合,ProcessWindowFunction 接收到的 Iterable<Orders> elements 实际上只包含一个元素——最终聚合好的那个订单对象。它的任务是利用窗口上下文信息,组装最终输出。
- 获取窗口元数据:在
ProcessWindowFunction 的 process 方法中,通过 context.window().getStart() 和 context.window().getEnd() 可以精确地获取到当前窗口的开始与结束时间戳,这正是在开篇需求中提到的关键信息。
五、总结
将 ProcessWindowFunction 与 ReduceFunction(或 AggregateFunction)结合使用,是一种非常高效的窗口处理模式。它完美地平衡了计算效率与功能灵活性:
- 效率方面:避免了单纯使用
ProcessWindowFunction 时需要将所有窗口元素缓存在状态中、直到窗口触发时才统一遍历计算的开销,实现了“来一条数据,计算一次”的增量聚合。
- 功能方面:又弥补了单纯使用
ReduceFunction 无法访问窗口元数据(如起止时间、迟到数据等)的不足。
这种组合模式在实时统计、复杂事件处理等场景下应用广泛。如果你对 Java 流处理或 Flink 的更多高级特性感兴趣,欢迎在 云栈社区 交流探讨。