找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2328

积分

1

好友

321

主题
发表于 7 天前 | 查看: 17| 回复: 0

一、概述

在 Apache Flink 中,ProcessWindowFunction 可以与 ReduceFunctionAggregateFunction 搭配使用,以实现窗口的增量聚合。这种组合模式允许数据在到达窗口时进行实时计算,当窗口关闭时,ProcessWindowFunction 将收到最终的聚合结果。如此一来,我们既能享受增量聚合带来的效率优势,又能通过 ProcessWindowFunction 获取窗口的元数据信息,为更复杂的业务逻辑处理提供了可能。

二、需求

我们将创建一个模拟数据源,持续生成订单数据。随后,基于处理时间,以5秒为间隔划分滚动窗口,并按照客户名(cusname)进行分组。在每个窗口内,我们需要对订单价格进行求和。最终输出的结果应包含:窗口的开始时间、窗口的结束时间、客户名以及该窗口内的累计价格。ProcessWindowFunction 在这里的关键作用,就是让我们能够访问到窗口的上下文信息,从而获取时间范围。

三、示例

1、数据来源

首先定义订单实体类和数据生成逻辑。

public class Orders {
    private Date addtime;
    private String cusname;
    private BigDecimal price;
    private int status;
    // 省略 getter/setter 方法
}

public class OrdersSourceObject implements GeneratorFunction<Long, Orders> {

    public OrdersSourceObject(){}

    @Override
    public Orders map(Long value) throws Exception {
        Orders order = new Orders();
        int i = new Random().nextInt(100);
        double min = 1.0;
        double max = 10.0;
        Random random = new Random();
        double randomNum = min + (max - min) * random.nextDouble();
        order.setStatus(1);
        order.setCusname("name"+i );
        order.setAddtime(new Date());
        order.setPrice(new BigDecimal(randomNum).setScale(2,BigDecimal.ROUND_HALF_UP));
        return order;
    }
}

接着,使用 Flink 的 DataGeneratorSource 来包装我们的生成器,设定生成速率。

DataGeneratorSource<Orders> dataGeneratorSource = new DataGeneratorSource<>(
        new OrdersSourceObject(),
        Long.MAX_VALUE,
        RateLimiterStrategy.perSecond(1),
        TypeInformation.of(Orders.class)
);

上述配置表示每秒生成1条订单记录,其中价格(price)是随机生成的小于10的两位小数。这种模拟数据源在测试流处理逻辑时非常有用,是 大数据 开发中的常见实践。

2、开窗并执行Reduce

核心的流处理逻辑如下:我们按cusname分组,开一个5秒的滚动处理时间窗口,然后应用reduce函数。reduce接收两个参数:第一个是执行增量聚合的ReduceFunction,第二个是处理窗口结果的ProcessWindowFunction

streamSource
    .keyBy(t -> t.getCusname())
    .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
    .reduce(new ReduceFunction<Orders>() {
        @Override
        public Orders reduce(Orders orders, Orders t1) throws Exception {
            Orders order = new Orders();
            order.setCusname(t1.getCusname());
            order.setAddtime(t1.getAddtime());
            order.setStatus(t1.getStatus());
            order.setPrice(t1.getPrice().add(orders.getPrice()));
            return order;
        }
    }, new ProcessWindowFunction<Orders, Tuple4<String, String, String, BigDecimal>, String, TimeWindow>() {
        @Override
        public void process(String s, Context context, Iterable<Orders> elements, Collector<Tuple4<String, String, String, BigDecimal>> out) throws Exception {
            long start = context.window().getStart();
            long end = context.window().getEnd();
            String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
            String windowEnd = DateFormatUtils.format(end, “yyyy-MM-dd HH:mm:ss.SSS");
            for (Orders order : elements) {
                out.collect(Tuple4.of(windowStart, windowEnd, s, order.getPrice()));
            }
        }
    }).print();

四、分析

让我们逐步拆解上述代码的执行过程:

  1. 按键分组(KeyBy):数据流按照 cusname 字段的值进行分组,确保相同客户的订单被分配到同一个逻辑流中进行处理。
  2. 开窗(Window)TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)) 定义了窗口策略。这是一个基于处理时间的滚动窗口,每个窗口的固定长度是5秒,窗口之间没有重叠。
  3. 双参数Reducereduce 方法接收了两个函数。
    • 第一个是 ReduceFunction:它负责增量聚合。每当一条新数据进入当前窗口,Flink 就会调用此函数,将新数据(t1)与已有的聚合结果(orders)进行合并。在本例中,合并操作就是价格的累加。
    • 第二个是 ProcessWindowFunction:它负责在窗口触发时被调用。此时,ReduceFunction 已经完成了窗口内所有数据的聚合,ProcessWindowFunction 接收到的 Iterable<Orders> elements 实际上只包含一个元素——最终聚合好的那个订单对象。它的任务是利用窗口上下文信息,组装最终输出。
  4. 获取窗口元数据:在 ProcessWindowFunctionprocess 方法中,通过 context.window().getStart()context.window().getEnd() 可以精确地获取到当前窗口的开始与结束时间戳,这正是在开篇需求中提到的关键信息。

五、总结

ProcessWindowFunctionReduceFunction(或 AggregateFunction)结合使用,是一种非常高效的窗口处理模式。它完美地平衡了计算效率与功能灵活性:

  • 效率方面:避免了单纯使用 ProcessWindowFunction 时需要将所有窗口元素缓存在状态中、直到窗口触发时才统一遍历计算的开销,实现了“来一条数据,计算一次”的增量聚合。
  • 功能方面:又弥补了单纯使用 ReduceFunction 无法访问窗口元数据(如起止时间、迟到数据等)的不足。

这种组合模式在实时统计、复杂事件处理等场景下应用广泛。如果你对 Java 流处理或 Flink 的更多高级特性感兴趣,欢迎在 云栈社区 交流探讨。




上一篇:SpringBoot + Redis 实现幂等性与防重复提交中间件方案
下一篇:技术人2025复盘:用Rust沉淀工程思维,借AI突破全栈瓶颈
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 09:17 , Processed in 0.292272 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表