找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

629

积分

0

好友

81

主题
发表于 5 天前 | 查看: 18| 回复: 0

概述

在 Flink SQL 中,LIKE 子句提供了一种基于现有表定义来创建新表的高效方式。它允许开发者复用或扩展原表的结构,同时可以灵活地排除或覆盖原始表中的特定部分。

需要注意的是,与标准的 SQL 不同,Flink 中的 LIKE 子句必须在 CREATE TABLE 语句中进行定义。这是因为 LIKE 子句可以用于定义表的多个组成部分(不仅仅是 Schema),因此它被视为 CREATE 语句的一个更上层的定义单元。

可合并的属性

通过 LIKE 子句,可以控制以下表属性的合并逻辑:

  • CONSTRAINTS - 主键和唯一键约束
  • GENERATED - 计算列
  • OPTIONS - 连接器信息、格式化方式等配置项
  • PARTITIONS - 表分区信息
  • WATERMARKS - watermark 定义

合并策略

对于上述属性,Flink 提供了三种明确的合并策略供你选择:

  • INCLUDING - 新表将包含源表的所有指定属性。如果新表与源表的属性存在重复(例如定义了相同 Key 的配置项),则语句执行会失败。
  • EXCLUDING - 新表将不包含源表的任何指定属性。
  • OVERWRITING - 新表包含源表的属性,但如果出现重复项,则会用新表定义中的属性值覆盖源表中的值。

默认策略:如果在创建表时没有指定任何 LIKE 配置项,Flink 将默认采用 INCLUDING ALL OVERWRITING OPTIONS 策略。这意味着除了 OPTIONS(连接器配置)会启用覆盖行为外,其他所有属性都将被包含且不可冲突。

实例

1. 使用默认策略

LIKE 子句的一个典型应用场景是重用连接器配置,或为外部表快速添加 Watermark 定义。

首先,我们定义一个源表 kafka_sink

String creatDDL = "CREATE TABLE kafka_sink (\n" +
                "  `id` INT,\n" +
                "  `addtime` TIMESTAMP(3)\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'kafka_source',\n" +
                "  'properties.bootstrap.servers' = '172.24.6.109:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'json'\n" +
                ")";

然后,基于此表使用 LIKE 创建新表 kafka_with_watermark,为其添加 Watermark 并修改 scan.startup.mode 配置:

String likeDDL = "CREATE TABLE kafka_with_watermark (\n" +
                "    WATERMARK FOR addtime AS addtime - INTERVAL '5' SECOND \n" +
                ") WITH (\n" +
                " 'scan.startup.mode' = 'latest-offset' " +
                ")" +
                "LIKE kafka_sink";

在这个例子中,由于使用了默认策略,新表会继承原表的所有属性和配置。对于重复的 scan.startup.mode 配置,新表定义的值(latest-offset)会覆盖原表的值(earliest-offset)。

因此,最终 kafka_with_watermark 表的等效定义如下:

String likeDDL = "CREATE TABLE kafka_sink (\n" +
                "  `id` INT,\n" +
                "  `addtime` TIMESTAMP(3),\n" +
                "    WATERMARK FOR addtime AS addtime - INTERVAL '5' SECOND \n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'kafka_source',\n" +
                "  'properties.bootstrap.servers' = '172.24.6.109:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")";

2. 使用 INCLUDING OPTIONS 策略

假设我们想要严格继承源表的所有配置项(OPTIONS),并且不允许覆盖,可以使用 INCLUDING 策略。这时,新表只能添加源表中不存在的属性。

String likeDDL = "CREATE TABLE kafka_with_watermark (\n" +
                "    WATERMARK FOR addtime AS addtime - INTERVAL '5' SECOND \n" +
                ")" +
                " LIKE kafka_sink (" +
                " INCLUDING ALL\n" +
                "   \n" +
                " )";

注意,使用 INCLUDING ALL 时,新表的 WITH 子句中不能包含任何与源表重复的配置,否则会报错。执行上述语句后,kafka_with_watermark 表的实际定义如下,它完整包含了 kafka_sink 的配置,并新增了 Watermark:

CREATE TABLE `default_catalog`.`default_database`.`kafka_with_watermark` (
  `id` INT,
  `addtime` TIMESTAMP(3),
  WATERMARK FOR `addtime` AS `addtime` - INTERVAL '5' SECOND
) WITH (
  'properties.bootstrap.servers' = '172.24.6.109:9092',
  'connector' = 'kafka',
  'format' = 'json',
  'topic' = 'kafka_source',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset'
)

总结

Flink SQL 的 LIKE 子句极大地简化了相似表结构的创建工作。通过 INCLUDINGEXCLUDINGOVERWRITING 这三种策略,你可以精确控制新表应从源表继承哪些属性,排除哪些属性,以及在冲突时是否覆盖原有属性。这种灵活性与 Flink 在大数据处理中常需对接多种数据库/中间件(如 Kafka)的场景非常契合,能够有效提升开发效率与代码的复用性。如果想了解更多关于流处理或数据架构的实战技巧,欢迎到云栈社区与同行们深入探讨。




上一篇:Go密码学设计哲学:四大原则如何塑造防误用、高安全的API
下一篇:MySQL与PostgreSQL技术选型:为什么大厂既用PgSQL也离不开MySQL?
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-24 01:45 , Processed in 0.342852 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表