概述
ChunJun(原 FlinkX)是一款基于 Apache Flink 打造的分布式数据同步工具,专注于异构数据源间的高效数据迁移与实时同步。它支持包括 MySQL、Oracle、Hive、HBase、Kafka 在内的多种数据源,并提供批流一体的数据处理能力,因此在数据仓库构建、实时数仓同步以及数据集成等场景中被广泛应用。
前提条件
在开始操作之前,请确保您的环境已满足以下条件:
- 已完成 ChunJun 的安装。
- Zookeeper 和 Kafka 服务已安装并运行。
- 已在 Kafka 中创建好主题
mk。
编写SQL文件
核心任务是通过 SQL 定义数据源(Source)和数据去向(Sink)。我们将创建一个 SQL 文件(例如 kafka_stream.sql),内容如下:
CREATE TABLE source (
id INT,
name STRING,
create_time timestamp
) WITH (
'connector' = 'kafka-x'
,'topic' = 'mk'
,'properties.bootstrap.servers' = '172.24.6.109:9092'
,'properties.group.id' = 'mk'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
,'scan.parallelism' = '1'
);
CREATE TABLE sink
(
id INT,
name STRING,
create_time timestamp
) WITH (
'connector' = 'stream-x'
);
INSERT INTO sink
SELECT *
from source;
以下是几个关键参数的解释:
| 属性 |
值 |
说明 |
topic |
mk |
指定要消费的 Kafka 主题名称。 |
properties.bootstrap.servers |
172.24.6.109:9092 |
Kafka 集群的地址。 |
format |
json |
指定消息的格式为 JSON。 |
准备测试数据
我们需要向 Kafka 主题 mk 中写入一些模拟的 JSON 格式数据,作为数据源。
使用 kafka-console-producer.sh 工具执行以下命令,并按行输入数据:
# kafka-console-producer.sh --broker-list 172.24.6.109:9092 --topic mk
>{"id":1,"name":"张三","create_time":"2025-10-01 12:09:33"}
>{"id":2,"name":"李四","create_time":"2025-10-02 13:08:22"}
>{"id":3,"name":"王五","create_time":"2025-10-03 14:07:11"}
执行效果如下图所示:

运行与测试
1. 启动ChunJun任务
在 ChunJun 的安装目录下,运行本地模式执行脚本,指定我们编写的 SQL 文件:
bin/chunjun-local.sh -job chunjun-examples/test/kafka_stream.sql
2. 验证实时同步
当 ChunJun 任务成功启动并运行后,它便处于监听状态。此时,如果向 Kafka 的 mk 主题中写入新的数据(例如通过另一个生产者或程序),ChunJun 会实时读取并处理这些数据。任务的控制台会打印出同步的结果,表明数据已从 Kafka 被成功读取并写入到定义的 Sink 表中。
配置数据读取的起始位置
在某些场景下,我们可能需要控制从 Kafka 的哪个位置开始消费数据。这可以通过 scan.startup.mode 参数来实现。
例如,如果你想从最早的消息开始消费,可以在定义 source 表的 WITH 参数中添加:
'scan.startup.mode' = 'earliest-offset'
scan.startup.mode 支持多种模式,具体如下:
| 值 |
注释 |
group-offsets (默认) |
从 ZK / Kafka brokers 中指定的消费组已经提交的 offset 开始消费。 |
earliest-offset |
从最早的偏移量开始消费。 |
latest-offset |
从最新的偏移量开始消费。 |
timestamp |
从每个分区的指定的时间戳开始消费。 |
specific-offsets |
从每个分区的指定的特定偏移量开始消费。 |
通过灵活运用 ChunJun 的 SQL 配置,我们可以轻松实现从 Kafka 到多种目标的数据同步任务,为实时数据处理管道提供强大支持。如果你想了解更多大数据生态的技术实践,欢迎到云栈社区交流探讨。