概述
在 Flink SQL 中,LIKE 子句提供了一种基于现有表定义来创建新表的高效方式。它允许开发者复用或扩展原表的结构,同时可以灵活地排除或覆盖原始表中的特定部分。
需要注意的是,与标准的 SQL 不同,Flink 中的 LIKE 子句必须在 CREATE TABLE 语句中进行定义。这是因为 LIKE 子句可以用于定义表的多个组成部分(不仅仅是 Schema),因此它被视为 CREATE 语句的一个更上层的定义单元。
可合并的属性
通过 LIKE 子句,可以控制以下表属性的合并逻辑:
- CONSTRAINTS - 主键和唯一键约束
- GENERATED - 计算列
- OPTIONS - 连接器信息、格式化方式等配置项
- PARTITIONS - 表分区信息
- WATERMARKS - watermark 定义
合并策略
对于上述属性,Flink 提供了三种明确的合并策略供你选择:
- INCLUDING - 新表将包含源表的所有指定属性。如果新表与源表的属性存在重复(例如定义了相同 Key 的配置项),则语句执行会失败。
- EXCLUDING - 新表将不包含源表的任何指定属性。
- OVERWRITING - 新表包含源表的属性,但如果出现重复项,则会用新表定义中的属性值覆盖源表中的值。
默认策略:如果在创建表时没有指定任何 LIKE 配置项,Flink 将默认采用 INCLUDING ALL OVERWRITING OPTIONS 策略。这意味着除了 OPTIONS(连接器配置)会启用覆盖行为外,其他所有属性都将被包含且不可冲突。
实例
1. 使用默认策略
LIKE 子句的一个典型应用场景是重用连接器配置,或为外部表快速添加 Watermark 定义。
首先,我们定义一个源表 kafka_sink:
String creatDDL = "CREATE TABLE kafka_sink (\n" +
" `id` INT,\n" +
" `addtime` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kafka_source',\n" +
" 'properties.bootstrap.servers' = '172.24.6.109:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
然后,基于此表使用 LIKE 创建新表 kafka_with_watermark,为其添加 Watermark 并修改 scan.startup.mode 配置:
String likeDDL = "CREATE TABLE kafka_with_watermark (\n" +
" WATERMARK FOR addtime AS addtime - INTERVAL '5' SECOND \n" +
") WITH (\n" +
" 'scan.startup.mode' = 'latest-offset' " +
")" +
"LIKE kafka_sink";
在这个例子中,由于使用了默认策略,新表会继承原表的所有属性和配置。对于重复的 scan.startup.mode 配置,新表定义的值(latest-offset)会覆盖原表的值(earliest-offset)。
因此,最终 kafka_with_watermark 表的等效定义如下:
String likeDDL = "CREATE TABLE kafka_sink (\n" +
" `id` INT,\n" +
" `addtime` TIMESTAMP(3),\n" +
" WATERMARK FOR addtime AS addtime - INTERVAL '5' SECOND \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kafka_source',\n" +
" 'properties.bootstrap.servers' = '172.24.6.109:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'format' = 'json'\n" +
")";
2. 使用 INCLUDING OPTIONS 策略
假设我们想要严格继承源表的所有配置项(OPTIONS),并且不允许覆盖,可以使用 INCLUDING 策略。这时,新表只能添加源表中不存在的属性。
String likeDDL = "CREATE TABLE kafka_with_watermark (\n" +
" WATERMARK FOR addtime AS addtime - INTERVAL '5' SECOND \n" +
")" +
" LIKE kafka_sink (" +
" INCLUDING ALL\n" +
" \n" +
" )";
注意,使用 INCLUDING ALL 时,新表的 WITH 子句中不能包含任何与源表重复的配置,否则会报错。执行上述语句后,kafka_with_watermark 表的实际定义如下,它完整包含了 kafka_sink 的配置,并新增了 Watermark:
CREATE TABLE `default_catalog`.`default_database`.`kafka_with_watermark` (
`id` INT,
`addtime` TIMESTAMP(3),
WATERMARK FOR `addtime` AS `addtime` - INTERVAL '5' SECOND
) WITH (
'properties.bootstrap.servers' = '172.24.6.109:9092',
'connector' = 'kafka',
'format' = 'json',
'topic' = 'kafka_source',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset'
)
总结
Flink SQL 的 LIKE 子句极大地简化了相似表结构的创建工作。通过 INCLUDING、EXCLUDING 和 OVERWRITING 这三种策略,你可以精确控制新表应从源表继承哪些属性,排除哪些属性,以及在冲突时是否覆盖原有属性。这种灵活性与 Flink 在大数据处理中常需对接多种数据库/中间件(如 Kafka)的场景非常契合,能够有效提升开发效率与代码的复用性。如果想了解更多关于流处理或数据架构的实战技巧,欢迎到云栈社区与同行们深入探讨。