找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2328

积分

1

好友

321

主题
发表于 7 天前 | 查看: 16| 回复: 0

今天我们来深入剖析 Apache Flink 中双流操作的核心源码实现。Flink 作为业界领先的流处理框架,其提供的多种数据流关联(Join)方式是处理复杂事件逻辑的关键。

三种双流关联方式概览

通过相关技术文章的介绍,我们了解到 Flink 主要提供了三种数据流关联方式,分别是 Window JoinInterval JoinCoGroup。这三种方式各有适用场景,下面我们将逐一解读其源码层面的实现机制。

Window Join 源码实现

我们先回顾一下 Window Join 的典型使用方法。

DataStream<Tuple2<String, Double>> result = source1.join(source2)
        .where(record -> record.f0)
        .equalTo(record -> record.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(2L)))
        .apply(new JoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() {
            @Override
            public Tuple2<String, Double> join(Tuple2<String, Double> record1, Tuple2<String, Double> record2) throws Exception {
                return Tuple2.of(record1.f0, record1.f1);
            }
        });

上述 API 调用会引发一系列内部类的转换,其流程可以概括为下图:

Flink Window Join 数据处理流程示意图

核心转换发生在 WithWindowapply 方法中。你会发现,这里实际上是构建了一个 CoGroupedWindowedStream,然后转而调用它的 apply 方法。

public <T> SingleOutputStreamOperator<T> apply(
        JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
    // clean the closure
    function = input1.getExecutionEnvironment().clean(function);

    coGroupedWindowedStream =
            input1.coGroup(input2)
                    .where(keySelector1)
                    .equalTo(keySelector2)
                    .window(windowAssigner)
                    .trigger(trigger)
                    .evictor(evictor)
                    .allowedLateness(allowedLateness);

    return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);
}

关键点:从这里我们可以清晰地看出,Window Join 在底层被转换成了 CoGroup 操作 来进行处理。

那么,在 JoinCoGroupFunction 中,具体的关联逻辑又是怎样的呢?其 coGroup 方法采用了简单的两层循环遍历,将两个数据流中的元素一一配对,并调用用户自定义的 JoinFunction

private static class JoinCoGroupFunction<T1, T2, T>
        extends WrappingFunction<JoinFunction<T1, T2, T>>
        implements CoGroupFunction<T1, T2, T> {

    private static final long serialVersionUID = 1L;

    public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
        super(wrappedFunction);
    }

    @Override
    public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out)
            throws Exception {
        for (T1 val1 : first) {
            for (T2 val2 : second) {
                out.collect(wrappedFunction.join(val1, val2));
            }
        }
    }
}

CoGroup 源码实现

CoGroup 的整体 API 调用流程与 Join 非常相似,我们直接深入其 apply 方法的核心。

public <T> SingleOutputStreamOperator<T> apply(
        CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
    // clean the closure
    function = input1.getExecutionEnvironment().clean(function);

    UnionTypeInfo<T1, T2> unionType =
            new UnionTypeInfo<>(input1.getType(), input2.getType());
    UnionKeySelector<T1, T2, KEY> unionKeySelector =
            new UnionKeySelector<>(keySelector1, keySelector2);

    SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput1 =
            input1.map(new Input1Tagger<T1, T2>());
    taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
    taggedInput1.returns(unionType);

    SingleOutputStreamOperator<TaggedUnion<T1, T2>> taggedInput2 =
            input2.map(new Input2Tagger<T1, T2>());
    taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
    taggedInput2.returns(unionType);

    DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

    // we explicitly create the keyed stream to manually pass the key type information in
    windowedStream =
            new KeyedStream<TaggedUnion<T1, T2>, KEY>(
                            unionStream, unionKeySelector, keyType)
                    .window(windowAssigner);

    if (trigger != null) {
        windowedStream.trigger(trigger);
    }
    if (evictor != null) {
        windowedStream.evictor(evictor);
    }
    if (allowedLateness != null) {
        windowedStream.allowedLateness(allowedLateness);
    }

    return windowedStream.apply(
            new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}

apply 方法中,Flink 先将两个输入流进行标记(Tagger)和合并(Union),然后基于合并后的流创建 KeyedStream 并设置窗口属性。最终,它会调用 windowedStream.apply 方法,并将用户传入的 function 包装成 CoGroupWindowFunction

private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
        extends WrappingFunction<CoGroupFunction<T1, T2, T>>
        implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

    private static final long serialVersionUID = 1L;

    public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
        super(userFunction);
    }

    @Override
    public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
            throws Exception {

        List<T1> oneValues = new ArrayList<>();
        List<T2> twoValues = new ArrayList<>();

        for (TaggedUnion<T1, T2> val : values) {
            if (val.isOne()) {
                oneValues.add(val.getOne());
            } else {
                twoValues.add(val.getTwo());
            }
        }
        wrappedFunction.coGroup(oneValues, twoValues, out);
    }
}

CoGroupWindowFunction.apply 方法中,它首先将同一个 key 下的数据按来源流(oneValuestwoValues)进行分离,然后再调用 JoinCoGroupFunction.coGroup 方法完成关联。这里 values 之所以都是相同 key 的数据,是因为底层 windowState 在维护状态时,是以窗口命名空间(namespace)和 key 共同作为标识的。当窗口触发计算时,就会对相同 key 的所有数据进行处理。

Interval Join 源码实现

接下来,我们分析另一种基于时间间隔的关联方式——Interval Join。首先回顾其用法:

DataStream<Tuple2<String, Double>> intervalJoinResult = source1.keyBy(record -> record.f0)
        .intervalJoin(source2.keyBy(record -> record.f0))
        .between(Time.seconds(-2), Time.seconds(2))
        .process(new ProcessJoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>() {
            @Override
            public void processElement(Tuple2<String, Double> record1, Tuple2<String, Double> record2, ProcessJoinFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple2<String, Double>>.Context context, Collector<Tuple2<String, Double>> out) throws Exception {
                out.collect(Tuple2.of(record1.f0, record1.f1 + record2.f1));
            }
        });

从 API 可以看出,Interval Join 操作的是两个 KeyedStream,通过 between 方法定义时间间隔的上下界,最终通过 process 方法执行计算逻辑。其内部类型转换流程如下图所示:

Flink Interval Join 类型转换流程图

我们重点关注 process 方法的实现:

public <OUT> SingleOutputStreamOperator<OUT> process(
        ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
        TypeInformation<OUT> outputType) {
    Preconditions.checkNotNull(processJoinFunction);
    Preconditions.checkNotNull(outputType);

    final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf =
            left.getExecutionEnvironment().clean(processJoinFunction);

    if (isEnableAsyncState) {
        final AsyncIntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
                new AsyncIntervalJoinOperator<>(
                        lowerBound,
                        upperBound,
                        lowerBoundInclusive,
                        upperBoundInclusive,
                        leftLateDataOutputTag,
                        rightLateDataOutputTag,
                        left.getType()
                                .createSerializer(
                                        left.getExecutionConfig().getSerializerConfig()),
                        right.getType()
                                .createSerializer(
                                        right.getExecutionConfig().getSerializerConfig()),
                        cleanedUdf);

        return left.connect(right)
                .keyBy(keySelector1, keySelector2)
                .transform("Interval Join [Async]", outputType, operator);
    } else {
        final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
                new IntervalJoinOperator<>(
                        lowerBound,
                        upperBound,
                        lowerBoundInclusive,
                        upperBoundInclusive,
                        leftLateDataOutputTag,
                        rightLateDataOutputTag,
                        left.getType()
                                .createSerializer(
                                        left.getExecutionConfig().getSerializerConfig()),
                        right.getType()
                                .createSerializer(
                                        right.getExecutionConfig().getSerializerConfig()),
                        cleanedUdf);

        return left.connect(right)
                .keyBy(keySelector1, keySelector2)
                .transform("Interval Join", outputType, operator);
    }
}

实现原理:Interval Join 是基于 ConnectedStreams 实现的。ConnectedStreams 为双流操作提供了一个更通用的抽象,它将两个流组合成一个 TwoInputTransformation 并加入执行图。具体的算子(Operator)是 IntervalJoinOperator 或其异步版本 AsyncIntervalJoinOperator。这两个算子都是 TwoInputStreamOperator 的实现,分别通过 processElement1processElement2 方法处理两个输入源的数据,其核心逻辑最终都收敛到 processElement 方法。

private <THIS, OTHER> void processElement(
        final StreamRecord<THIS> record,
        final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
        final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
        final long relativeLowerBound,
        final long relativeUpperBound,
        final boolean isLeft)
        throws Exception {

    final THIS ourValue = record.getValue();
    final long ourTimestamp = record.getTimestamp();

    if (ourTimestamp == Long.MIN_VALUE) {
        throw new FlinkException(
                "Long.MIN_VALUE timestamp: Elements used in "
                        + "interval stream joins need to have timestamps meaningful timestamps.");
    }

    if (isLate(ourTimestamp)) {
        sideOutput(ourValue, ourTimestamp, isLeft);
        return;
    }

    addToBuffer(ourBuffer, ourValue, ourTimestamp);

    for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket : otherBuffer.entries()) {
        final long timestamp = bucket.getKey();

        if (timestamp < ourTimestamp + relativeLowerBound
                || timestamp > ourTimestamp + relativeUpperBound) {
            continue;
        }

        for (BufferEntry<OTHER> entry : bucket.getValue()) {
            if (isLeft) {
                collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
            } else {
                collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
            }
        }
    }

    long cleanupTime =
            (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
    if (isLeft) {
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
    } else {
        internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
    }
}

核心机制IntervalJoinOperator 内部维护了两个 MapState。每条数据到达时,会根据其时间戳被添加到对应的 MapState 中(Key 是时间戳,Value 是该时刻的元素列表)。然后,算子会遍历另一个流的 MapState,找出时间戳落在当前元素时间戳指定间隔内的所有数据,进行关联输出。最后,它会为当前数据注册一个清理定时器,当时间推进到有效范围之外时,自动清除 MapState 中过期时间戳下的数据。

总结

通过本文的源码级梳理,我们揭示了 Flink 三种双流关联操作的底层实现:

  1. Window Join:其底层是通过转换为 CoGroup 操作来实现的,最终在窗口触发时对相同key的数据进行笛卡尔积式的关联。
  2. CoGroup:其本身会将两个流合并成一个 WindowedStream,并完全依赖于 WindowState 来管理数据,在窗口计算时对分组后的数据进行关联。
  3. Interval Join:基于 ConnectedStreams 实现,其核心算子 IntervalJoinOperator 内部维护了两个 MapState,通过高效的时间范围查询来完成数据关联,这是一种不同于窗口模型的关联机制。

理解这些底层实现,对于在 开源实战 中设计高性能、高准确的流处理作业至关重要,也能帮助开发者更好地进行故障排查和性能调优。




上一篇:Flink与Spark全局变量差异实测:多线程场景下的共享状态处理
下一篇:深入解析K8s节点:DaemonSet与CRI、CNI、CSI三大接口如何管理Pod
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 09:16 , Processed in 0.297547 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表