什么是 Window Join?
官方对 Window Join 的定义可以概括为一句话:它在 JOIN 条件中引入了窗口边界,使得关联操作只在“同一时间窗口内且键值匹配”的记录间进行。
与普通的 Regular Join 相比,Window Join 具有两个核心特点:
- 有时间窗口约束:不再是全量历史数据参与关联,而是将数据按窗口切分,在每个窗口内部进行局部 JOIN。
- 结果输出与状态清理具有时效性:与窗口聚合类似,Window Join 在窗口结束时才输出最终结果,并随之清理该窗口对应的计算状态,从而有效避免了状态的无限膨胀。
注意:Window Join 通常是 “窗口表值函数(Windowing TVF)+ JOIN” 的组合。即先使用 TUMBLE、HOP、CUMULATE 或 SESSION 等函数将表数据切分为窗口表,再对这些窗口表执行 JOIN 操作。
核心语法:Windowing TVF + JOIN
Window Join 的标准语法结构如下(以滚动窗口 TUMBLE 为例):
SELECT ...
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) AS L
JOIN (
SELECT * FROM TABLE(
TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) AS R
ON L.window_start = R.window_start
AND L.window_end = R.window_end
AND L.join_key = R.join_key;
![]https://static1.yunpan.plus/attachment/d94c275a2646.png)
使用 Window Join 必须遵循以下几个硬性规则:
- 参与方必须是 Windowing TVF 的结果:左右两侧表必须是
TUMBLE/HOP/CUMULATE/SESSION 等窗口函数处理后的结果。
- JOIN 条件必须包含窗口边界的等值匹配:
L.window_start = R.window_start
AND L.window_end = R.window_end
- 两侧的窗口类型与参数必须完全一致:例如,必须同时使用
TUMBLE 窗口,且窗口大小 INTERVAL '5' MINUTES 也需相同。
违反上述规则,查询规划器(Planner)将报错或产生非预期的语义。
完整示例:理解 Window Join 的“时间切片”机制
假设我们有两个输入表,定义如下:
CREATE TABLE LeftTable (
row_time TIMESTAMP(3),
num INT,
id STRING,
WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (...);
CREATE TABLE RightTable (
row_time TIMESTAMP(3),
num INT,
id STRING,
WATERMARK FOR row_time AS row_time - INTERVAL '1' SECOND
) WITH (...);
![]https://static1.yunpan.plus/attachment/d94c275a2646.png)
表中包含如下样例数据:
- LeftTable:
2020-04-15 12:02 | 1 | L1
2020-04-15 12:06 | 2 | L2
2020-04-15 12:03 | 3 | L3
- RightTable:
2020-04-15 12:01 | 2 | R2
2020-04-15 12:04 | 3 | R3
2020-04-15 12:05 | 4 | R4
现在,我们执行一个 5 分钟滚动窗口的 FULL OUTER Window Join:
SELECT
L.num AS L_Num,
L.id AS L_Id,
R.num AS R_Num,
R.id AS R_Id,
COALESCE(L.window_start, R.window_start) AS window_start,
COALESCE(L.window_end, R.window_end) AS window_end
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) L
FULL JOIN (
SELECT * FROM TABLE(
TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) R
ON L.num = R.num
AND L.window_start = R.window_start
AND L.window_end = R.window_end;
![]https://static1.yunpan.plus/attachment/d94c275a2646.png)
窗口如何切分?
按照5分钟滚动窗口,数据被划分为:
- 窗口一
[12:00, 12:05):
- LeftTable:
L1(12:02, num=1), L3(12:03, num=3)
- RightTable:
R2(12:01, num=2), R3(12:04, num=3)
- 窗口二
[12:05, 12:10):
- LeftTable:
L2(12:06, num=2)
- RightTable:
R4(12:05, num=4)
结果分析与解读
执行上述 JOIN 后,输出结果如下:
L_Num | L_Id | R_Num | R_Id | window_start | window_end
----- | ---- | ----- | ---- | ------------------- | -------------------
1 | L1 | null | null | 2020-04-15 12:00:00 | 2020-04-15 12:05:00
null | null | 2 | R2 | 2020-04-15 12:00:00 | 2020-04-15 12:05:00
3 | L3 | 3 | R3 | 2020-04-15 12:00:00 | 2020-04-15 12:05:00
2 | L2 | null | null | 2020-04-15 12:05:00 | 2020-04-15 12:10:00
null | null | 4 | R4 | 2020-04-15 12:05:00 | 2020-04-15 12:10:00
关键点解析:
L3 与 R3 成功 JOIN:因为它们同处 [12:00,12:05) 窗口,且 num 值均为 3。
L1 与 R2 独立输出:因为它们在各自表中未找到匹配项(键值不同)。
L2(num=2) 与 R2(num=2) 为何没有 JOIN? 尽管键值相同,但 L2 属于第二个窗口 [12:05,12:10),而 R2 属于第一个窗口 [12:00,12:05)。JOIN 条件要求 window_start 和 window_end 必须相等,因此它们被视为不同业务时间段的数据,不予关联。
这正是 Window Join 的核心语义:只有处于同一时间窗口且键值相等的记录才会被配对,不同窗口间的同键记录被视为完全独立。
Window Join 支持的各种 JOIN 类型
Window Join 虽然基于窗口概念,但在结果集的保留方式上,支持所有经典的 JOIN 形态,非常适合用于实时数据处理场景。
INNER / LEFT / RIGHT / FULL OUTER Window Join
基本语法形式如下:
SELECT ...
FROM L
[LEFT | RIGHT | FULL OUTER] JOIN R
ON L.window_start = R.window_start
AND L.window_end = R.window_end
AND L.key = R.key;
- INNER JOIN:仅输出同一窗口内左右两侧均存在且匹配的记录。
- LEFT/RIGHT JOIN:保留指定侧窗口内的所有记录,另一侧无匹配则填充 NULL。
- FULL OUTER JOIN:在窗口粒度上进行“全量对账”,保留左右两侧的所有记录。
业务场景举例:
- INNER:统计每个窗口内成功匹配的订单与物流信息。
- LEFT:查找每个窗口内所有发出的告警(左表),并标记是否有对应的处理工单(右表)。
- FULL OUTER:生成对账报表,便于后续识别“仅左表存在”、“仅右表存在”或“双边存在”的数据差异。这种对账需求在大数据处理中非常常见。
SEMI Window Join:窗口内的存在性判断
语义:对于左表中的一条记录,只要在同一窗口内的右表中存在至少一条匹配记录,就保留这条左表记录。
在 Flink SQL 中,通常使用 IN 或 EXISTS 子查询来实现:
-- IN 写法
SELECT *
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) L
WHERE L.num IN (
SELECT num
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) R
WHERE L.window_start = R.window_start
AND L.window_end = R.window_end
);
![]https://static1.yunpan.plus/attachment/d94c275a2646.png)
或者使用 EXISTS 写法:
SELECT *
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) L
WHERE EXISTS (
SELECT *
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) R
WHERE L.num = R.num
AND L.window_start = R.window_start
AND L.window_end = R.window_end
);
![]https://static1.yunpan.plus/attachment/d94c275a2646.png)
特点与应用:
- 只关心“在这个窗口里是否存在匹配”,不关心匹配的具体条数,也不需要获取右表的字段值。
- 常用于窗口内的存在性筛查,例如:“筛选出过去5分钟内发生过支付行为的用户”、“找出最近1小时内有登录记录的设备”。
ANTI Window Join:窗口内的“未匹配”查询
语义:保留那些在指定窗口内完全找不到任何匹配的左表记录。
使用 NOT IN 实现:
SELECT *
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) L
WHERE L.num NOT IN (
SELECT num
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) R
WHERE L.window_start = R.window_start
AND L.window_end = R.window_end
);
![]https://static1.yunpan.plus/attachment/d94c275a2646.png)
或使用 NOT EXISTS 实现:
SELECT *
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) L
WHERE NOT EXISTS (
SELECT *
FROM (
SELECT * FROM TABLE(
TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES)
)
) R
WHERE L.num = R.num
AND L.window_start = R.window_start
AND L.window_end = R.window_end
);
![]https://static1.yunpan.plus/attachment/d94c275a2646.png)
典型应用场景:
- 识别窗口内未支付的订单。
- 查找窗口内未被消费者处理的消息。
- 筛选窗口内未发生特定行为的用户(用于反向营销或监控)。
Window Join 与其他流式 JOIN 的对比
为了在实际项目中做出合适的技术选型,可以简单对比几种主要的流式 JOIN:
| 类型 |
时间条件写法 |
状态规模控制 |
核心适用场景 |
| Regular Join |
仅键值条件 |
无界,依赖状态TTL清理 |
维表关联、更新较少的记录匹配 |
| Interval Join |
BETWEEN t1 - x AND t1 + y |
依赖Watermark清理过期数据 |
基于时间间隔的事件关联(如下单后X小时内支付) |
| Temporal Join |
FOR SYSTEM_TIME AS OF |
按主键关联历史版本 |
关联版本化的维表(如历史汇率表) |
| Window Join |
window_start/end 相等 + 键值条件 |
按窗口生命周期自动清理 |
按固定窗口对账、窗口内匹配统计、存在性判断 |
选择 Window Join 的典型业务信号包括:
- 业务逻辑天然是“每N分钟/小时统计或对账一次”。
- 不关心跨窗口的全局匹配,只关注每个独立窗口内的数据关联情况。
- 希望状态能周期性彻底释放,避免像 Regular Join 那样积累长尾状态带来运维压力。
使用限制与注意事项
根据官方文档和实践经验,使用 Window Join 时需要注意以下几点,它们常是容易踩坑的地方。
1. JOIN 条件的强制性
- 必须包含窗口边界等值条件:
L.window_start = R.window_start AND L.window_end = R.window_end。虽然未来版本可能简化,但目前两者都必须显式写出。
2. Windowing TVF 必须一致
- 左右两侧必须使用相同类型的窗口函数(如都是
TUMBLE)。
- 窗口函数的所有参数也必须完全一致(如HOP窗口的滑动步长
SLIDE和窗口大小SIZE需相同)。
3. 对 Session Window 的特殊限制
- 在批处理(BATCH)模式下,Session Window Join 目前不受支持。
- 在当前版本中,如果 Window Join 紧跟 Windowing TVF 之后:
- 完全支持 Tumble、Hop、Cumulate 窗口。
- Session Window 仍处于“概念支持与测试(Beta)”阶段,生产环境使用时需谨慎,并密切关注版本发布说明。对于复杂的流式计算任务,合理的窗口设计至关重要。
4. 状态压力的评估
尽管 Window Join 会释放过期窗口状态,但仍需评估:
- 窗口长度越大,窗口内需要缓存的未完成计算的数据就越多,峰值状态量可能很高。
- 在数据流量(QPS)很大的场景下,窗口内的瞬时状态量依然可能非常可观。
- 建议结合以下策略进行优化:
- 根据业务容忍度和乱序情况,选择合理的窗口大小。
- 设置合适的 Watermark 延迟,在允许一定乱序的同时避免延迟过高。
- 对状态后端(如 RocksDB)进行针对性调优,并进行充分的资源评估。
总结与最佳实践建议
最后,我们总结几条 Window Join 的实用经验:
- 贴合“时间账期”语义的业务优先考虑:如果业务本质是按固定周期(如每分钟、每小时)进行数据对账或匹配统计,Window Join 是比 Regular Join 更自然、状态更可控的选择。
- 统一使用 Windowing TVF 定义窗口:采用
TUMBLE/HOP/CUMULATE 等标准函数,配合 window_start、window_end、window_time 系统字段。这样,后续无论是窗口聚合、Window Join 还是 Window TopN,都能复用相同的窗口定义,保持语义清晰。
- 善用 SEMI/ANTI JOIN 简化逻辑:当只关心匹配“存在与否”时,使用 SEMI JOIN(存在即输出)或 ANTI JOIN(不存在才输出)。这能使 SQL 意图更明确,并避免因输出不必要字段而导致的数据膨胀。
- 窗口大小与 Watermark 需协同设计:这是一个权衡过程。
- 窗口太小:可能无法涵盖业务事件的完整周期,尤其在数据乱序严重时,匹配率会下降。
- 窗口太大:会导致窗口关闭前内存中累积的状态量激增,增加资源消耗和故障恢复成本。
- Watermark 延迟太小:系统对乱序数据的容错性差,可能过早判定窗口结束,导致数据丢失。
- Watermark 延迟太大:结果输出延迟高,影响实时性。
通过深入理解其原理、灵活运用其多种形式并规避常见陷阱,Window Join 将成为你在 Flink 流计算 SQL 中处理时间窗口关联需求的得力工具。