找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1352

积分

0

好友

189

主题
发表于 15 小时前 | 查看: 2| 回复: 0

在 Flink 中,Watermark 是解决数据乱序问题的核心机制,也是决定窗口何时触发计算的关键。本文将从源码层面,深入剖析 Watermark 的定义、生成、传播以及最终触发窗口计算的全过程。

Watermark 类定义

Watermark 类的定义非常简洁,它继承了 StreamElement 类,内部仅维护一个时间戳变量。它提供了两个特殊实例:MAX_WATERMARK 表示事件时间的结束,UNINITIALIZED 表示尚未生成任何有效 Watermark。

@PublicEvolving
public class Watermark extends StreamElement {
    /** The watermark that signifies end-of-event-time. */
    public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
    /** The watermark that is used before any actual watermark has been generated. */
    public static final Watermark UNINITIALIZED = new Watermark(Long.MIN_VALUE);

    /** The timestamp of the watermark in milliseconds. */
    protected final long timestamp;

    /** Creates a new watermark with the given timestamp in milliseconds. */
    public Watermark(long timestamp) {
        this.timestamp = timestamp;
    }

    /** Returns the timestamp associated with this {@link Watermark} in milliseconds. */
    public long getTimestamp() {
        return timestamp;
    }

    @Override
    public boolean equals(Object o) {
        return this == o
                || o != null
                && o.getClass() == this.getClass()
                && ((Watermark) o).timestamp == this.timestamp;
    }

    @Override
    public int hashCode() {
        return (int) (timestamp ^ (timestamp >>> 32));
    }

    @Override
    public String toString() {
        return "Watermark @ " + timestamp;
    }
}

Watermark 的处理流程

回顾 Watermark 的生成方式,通常在数据源后调用 assignTimestampsAndWatermarks 方法。

SingleOutputStreamOperator<Event> withTimestampsAndWatermarks = source
        .assignTimestampsAndWatermarks(
                WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
        );

1. 初始化与注册

当调用 assignTimestampsAndWatermarks 方法时,传入的 WatermarkStrategy 参数会被封装进一个 TimestampsAndWatermarksTransformation 中,并添加到作业的 Transformation 列表中。

在生成 StreamGraph 的过程中,会调用每个 Transformation 的 transform 方法。通过一系列调用,最终创建出 TimestampsAndWatermarksOperatorFactory。在初始化 StreamTask 时,会调用工厂方法创建 TimestampsAndWatermarksOperator 算子实例,并执行其 open 方法。

transform

open 方法中,主要完成两件事:

  1. 根据 WatermarkStrategy 创建 TimestampAssigner(用于提取事件时间戳)和 WatermarkGenerator(用于生成 Watermark)。
  2. 注册一个周期性的处理时间定时器。定时器触发时,会调用 onProcessingTime 方法。
public void onProcessingTime(long timestamp) throws Exception {
    watermarkGenerator.onPeriodicEmit(wmOutput); // 生成并发出Watermark
    final long now = getProcessingTimeService().getCurrentProcessingTime();
    getProcessingTimeService().registerTimer(now + watermarkInterval, this); // 注册下一个定时器
}

2. Watermark 的生成与发射

以常用的 BoundedOutOfOrdernessWatermarks 策略为例,其 onPeriodicEmit 方法会计算并向下游发射一个 Watermark,其时间戳为 maxTimestamp - outOfOrdernessMillis - 1。其中 maxTimestamp 是当前观察到的最大事件时间戳,outOfOrdernessMillis 是用户允许的最大乱序时间。

emitWatermark

WatermarkEmitter.emitWatermark 方法中,会更新当前算子持有的 Watermark 值。随后,RecordWriterOutput.emitWatermark 会将此 Watermark 广播给所有下游任务。

3. 下游任务对 Watermark 的处理

下游任务通过 StreamOneInputProcessor.processInput 方法处理输入数据,其中包含对 Watermark 的处理链路。

processWatermark

PipelinedSubpartitioninputWatermark 方法中,会更新对应输入通道的 Watermark 状态,并尝试在所有对齐的输入通道中寻找一个最小的 Watermark。如果这个最小值比最近一次发送的 Watermark 更大,则调用 emitWatermark 方法将其继续向下游传递。

public void emitWatermark(Watermark watermark) throws Exception {
    watermarkGauge.setCurrentWatermark(watermark.getTimestamp()); // 更新度量指标
    operator.processWatermark(watermark); // 传递给算子
}

operator.processWatermark 会调用时间服务 InternalTimeServiceManageradvanceWatermark 方法,这是触发事件时间相关操作(如窗口计算)的关键。

advanceWatermark

InternalTimerServiceImpl.tryAdvanceWatermark 方法中,逻辑清晰:

  1. 更新当前算子的 Watermark 值。
  2. 检查基于事件时间的定时器队列 (eventTimeTimersQueue)。
  3. 如果队首定时器的时间戳小于或等于当前 Watermark 的时间戳,则将其弹出队列并执行。这个过程正是窗口触发计算的本质
public boolean tryAdvanceWatermark(long time, InternalTimeServiceManager.ShouldStopAdvancingFn shouldStopAdvancingFn) throws Exception {
    currentWatermark = time;
    InternalTimer<K, N> timer;
    boolean interrupted = false;
    while ((timer = eventTimeTimersQueue.peek()) != null
            && timer.getTimestamp() <= time
            && !cancellationContext.isCancelled()
            && !interrupted) {
        keyContext.setCurrentKey(timer.getKey());
        eventTimeTimersQueue.poll();
        triggerTarget.onEventTime(timer); // 触发事件时间定时器,例如窗口计算
        taskIOMetricGroup.getNumFiredTimers().inc();
        interrupted = shouldStopAdvancingFn.test();
    }
    return !interrupted;
}

最终,Watermark 会随数据流抵达 Sink 节点。在 StreamSink 中,如果用户自定义了 Sink 并实现了相应的 Watermark 处理接口,则会调用该方法;否则,Sink 算子会忽略 Watermark。

总结

本文系统地梳理了 Apache Flink 中 Watermark 的源码实现。我们从简单的类定义入手,逐步分析了它在流处理管道中的完整生命周期:从基于 WatermarkStrategy 的初始化与周期性生成,到跨网络向下游广播,再到下游任务接收后如何通过推进事件时间时钟来触发关键的窗口计算。理解这一机制,对于构建健壮、准确的实时大数据应用至关重要。窗口触发的具体细节将在后续篇章中详细探讨。




上一篇:Express JWT认证实战指南:从登录到鉴权的完整流程与模板代码
下一篇:Nginx核心配置与实战场景解析:从负载均衡到安全管控
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-24 17:18 , Processed in 0.177444 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表