找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2688

积分

0

好友

375

主题
发表于 15 小时前 | 查看: 0| 回复: 0

概述

ChunJun(原 FlinkX)是一款基于 Apache Flink 打造的分布式数据同步工具,专注于异构数据源间的高效数据迁移与实时同步。它支持包括 MySQL、Oracle、Hive、HBase、Kafka 在内的多种数据源,并提供批流一体的数据处理能力,因此在数据仓库构建、实时数仓同步以及数据集成等场景中被广泛应用。

前提条件

在开始操作之前,请确保您的环境已满足以下条件:

  1. 已完成 ChunJun 的安装。
  2. Zookeeper 和 Kafka 服务已安装并运行。
  3. 已在 Kafka 中创建好主题 mk

编写SQL文件

核心任务是通过 SQL 定义数据源(Source)和数据去向(Sink)。我们将创建一个 SQL 文件(例如 kafka_stream.sql),内容如下:

CREATE TABLE source (
    id INT,
    name STRING,
    create_time timestamp
) WITH (
      'connector' = 'kafka-x'
      ,'topic' = 'mk'
      ,'properties.bootstrap.servers' = '172.24.6.109:9092'
      ,'properties.group.id' = 'mk'
      ,'format' = 'json'
      ,'json.timestamp-format.standard' = 'SQL'
      ,'scan.parallelism' = '1'
      );

CREATE TABLE sink
(
    id INT,
    name STRING,
    create_time timestamp
) WITH (
      'connector' = 'stream-x'
      );

INSERT INTO sink
SELECT *
from source;

以下是几个关键参数的解释:

属性 说明
topic mk 指定要消费的 Kafka 主题名称。
properties.bootstrap.servers 172.24.6.109:9092 Kafka 集群的地址。
format json 指定消息的格式为 JSON。

准备测试数据

我们需要向 Kafka 主题 mk 中写入一些模拟的 JSON 格式数据,作为数据源。

使用 kafka-console-producer.sh 工具执行以下命令,并按行输入数据:

# kafka-console-producer.sh --broker-list 172.24.6.109:9092 --topic mk
>{"id":1,"name":"张三","create_time":"2025-10-01 12:09:33"}
>{"id":2,"name":"李四","create_time":"2025-10-02 13:08:22"}
>{"id":3,"name":"王五","create_time":"2025-10-03 14:07:11"}

执行效果如下图所示:
Kafka生产者命令行写入测试数据

运行与测试

1. 启动ChunJun任务

在 ChunJun 的安装目录下,运行本地模式执行脚本,指定我们编写的 SQL 文件:

bin/chunjun-local.sh -job chunjun-examples/test/kafka_stream.sql

2. 验证实时同步

当 ChunJun 任务成功启动并运行后,它便处于监听状态。此时,如果向 Kafka 的 mk 主题中写入新的数据(例如通过另一个生产者或程序),ChunJun 会实时读取并处理这些数据。任务的控制台会打印出同步的结果,表明数据已从 Kafka 被成功读取并写入到定义的 Sink 表中。

配置数据读取的起始位置

在某些场景下,我们可能需要控制从 Kafka 的哪个位置开始消费数据。这可以通过 scan.startup.mode 参数来实现。

例如,如果你想从最早的消息开始消费,可以在定义 source 表的 WITH 参数中添加:

'scan.startup.mode' = 'earliest-offset'

scan.startup.mode 支持多种模式,具体如下:

注释
group-offsets (默认) 从 ZK / Kafka brokers 中指定的消费组已经提交的 offset 开始消费。
earliest-offset 从最早的偏移量开始消费。
latest-offset 从最新的偏移量开始消费。
timestamp 从每个分区的指定的时间戳开始消费。
specific-offsets 从每个分区的指定的特定偏移量开始消费。

通过灵活运用 ChunJun 的 SQL 配置,我们可以轻松实现从 Kafka 到多种目标的数据同步任务,为实时数据处理管道提供强大支持。如果你想了解更多大数据生态的技术实践,欢迎到云栈社区交流探讨。




上一篇:AI编程与Agent工程师崛起,传统前后端分工模式面临重塑
下一篇:RT-Thread Vector软件包:为STM32等MCU设计的动态数组与内存管理方案
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-26 18:42 , Processed in 0.282412 second(s), 43 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表