在现代数据架构中,实现跨异构数据库的数据同步是构建数据仓库、支持实时分析的关键环节。Apache SeaTunnel作为一个高性能的数据集成平台,为这类需求提供了简洁高效的解决方案。本文将详细演示如何使用SeaTunnel完成MySQL与OceanBase数据库之间的数据同步,涵盖一次性批处理和基于变更数据捕获(CDC)的实时流处理两种典型模式。
环境与数据准备
在开始同步任务前,需完成基础环境配置:
- 驱动准备:下载OceanBase JDBC驱动(如
oceanbase-client-2.4.16.jar)和MySQL驱动,放置于SeaTunnel的 lib/ 目录下。
- 插件准备:下载
connector-cdc-mysql 插件,放置于 connectors/ 目录,用于后续实时同步。
- 数据准备:在MySQL数据库中创建测试表并插入数据,作为同步的源。
CREATE TABLE test1 (id INT PRIMARY KEY, name VARCHAR(20));
INSERT INTO test1 VALUES (1,'张三');
INSERT INTO test1 VALUES (2,'李四');
INSERT INTO test1 VALUES (3,'王五');
示例一:批处理数据同步(MySQL -> Console)
本示例将MySQL数据一次性读取并输出到控制台,演示基本的批处理流程。
配置文件 (ocean.conf):
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
driver = "com.oceanbase.jdbc.Driver"
url = "jdbc:oceanbase://10.0.20.227:2883/db1?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
username = "root"
password = "8pK0z8gyjPvJmC9CItjW"
compatible_mode = "mysql"
query = "select * from test1"
}
}
sink {
Console {}
}
关键参数说明:
compatible_mode:设置为 "mysql",以兼容OceanBase的MySQL模式。
query:指定需要同步数据的查询语句。
运行命令与输出:
执行以下命令启动同步任务:
bin/seatunnel.sh --config job/ocean.conf -m local
任务执行后,可在控制台日志中看到同步成功的三条数据记录,验证了批处理同步的有效性。
示例二:实时数据同步(MySQL CDC -> OceanBase)
此示例展示如何通过MySQL CDC源端实时捕获数据变更,并同步到OceanBase目标库,这是构建实时数仓的常见场景。
1. 源端MySQL表结构
CREATE TABLE `test2` (
`id` int(11) NOT NULL,
`name` varchar(60) DEFAULT NULL,
`city` varchar(60) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 初始化数据
INSERT INTO `test2` VALUES (1, '张三', '北京');
INSERT INTO `test2` VALUES (2, '李四', '上海');
INSERT INTO `test2` VALUES (3, '王五', '广州');
2. 配置文件 (mysql_ocean.conf):
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source {
MySQL-CDC {
base-url = "jdbc:mysql://localhost:3306/cdc?serverTimezone=UTC"
username = "root"
password = "root"
table-names = ["cdc.test2"]
startup.mode = "initial"
}
}
sink {
jdbc {
driver = "com.oceanbase.jdbc.Driver"
url = "jdbc:oceanbase://10.0.20.227:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
username = "root"
password = "8pK0z8gyjPvJmC9CItjW"
compatible_mode = "mysql"
generate_sink_sql = true
database = test
table = test2
primary_keys = ["id"]
}
}
关键参数说明:
startup.mode:"initial" 表示首次启动时先做全量快照,再持续监听增量变更。
generate_sink_sql:设置为 true,SeaTunnel会自动根据源表结构在目标端生成建表语句。
primary_keys:定义目标表的主键,用于实现UPSERT语义,保障数据一致性。
3. 运行与验证
启动任务:
bin/seatunnel.sh --config job/mysql_ocean.conf -m local
任务启动后,首先会将初始化数据全量同步至OceanBase。随后在MySQL源表执行插入操作:
INSERT INTO test2 VALUES (4,'赵六','深圳');
稍作等待,查询OceanBase目标表,即可发现新增数据已被实时同步,这充分证明了SeaTunnel在大数据实时集成场景下的能力。
总结
通过以上两个实战示例,我们展示了使用SeaTunnel完成MySQL到OceanBase数据同步的完整流程。批处理模式适用于一次性数据迁移或T+1离线同步场景;而CDC实时同步模式则能够满足对数据时效性要求高的业务,为实时分析、数据中台提供稳定的数据流。SeaTunnel凭借其简洁的配置、强大的连接器生态以及稳定高效的执行引擎,已成为企业构建现代化数据管道的重要工具之一。