找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

676

积分

0

好友

86

主题
发表于 前天 09:14 | 查看: 7| 回复: 0

在项目开发中,我们经常遇到需要将数据同步到其他数据库的场景,例如不同的业务系统需要数据共享,或者将本平台的数据同步到第三方平台。传统的定时任务+接口调用的方式,往往面临实时性差、协调成本高和可靠性低等问题。

传统同步方案的痛点

  • 实时性差:通常只能做到分钟级或小时级同步,数据延迟高。
  • 协调成本高:需要各部门系统配合开发数据上报接口,沟通协调工作量大。
  • 开发工作量大:每个对接部门都需要定制化开发接口,维护成本高。
  • 可靠性问题:接口调用失败时数据容易丢失,需要复杂的重试和补偿机制。

经过实际项目应用,总结出Flink CDC在数据同步方面的核心优势:

1. 同步模式灵活

  • 全量同步:支持历史数据一次性迁移,适合初始化场景。
  • 增量同步:实时捕获数据变更,保持数据同步状态。
  • 混合模式:先全量后增量,无缝切换,业务无感知。

2. 实时性能卓越

  • 毫秒级延迟:源表数据的增删改操作,几乎实时同步到目标表。
  • 高吞吐量:单任务可处理万级TPS,满足大部分业务场景。
  • 资源可控:支持背压机制,自动调节同步速度。

3. 使用门槛低

  • 配置驱动:通过SQL配置即可完成同步任务,无需编程。
  • 可视化管理:Web界面监控任务状态,运维友好。

需要注意的是,使用Flink CDC的前提条件是确保Flink服务器与源数据库(Oracle)和目标数据库(MySQL)之间具备稳定的网络连通性。

1. Flink环境搭建

1.1 部署模式选择

Flink支持多种部署模式,根据业务场景选择合适的部署方案:

  • Standalone模式:适合开发测试和小规模生产环境。
  • Yarn模式:适合大数据集群环境,资源管理更灵活。
  • Kubernetes模式:适合容器化部署,弹性伸缩能力强。

本次实践采用Standalone独立部署模式,具有部署简单、维护方便的特点。

1.2 安装步骤

  1. 下载Flink安装包(本文使用 flink-1.19.3-bin-scala_2.12.tgz),下载地址:https://flink.apache.org/downloads/
  2. 解压并设置环境变量。
  3. 启动集群。
tar -xzf flink-*.tgz
export FLINK_HOME=/path/flink-*

# 启动集群
cd /path/flink-*
./bin/start-cluster.sh

# 关闭集群
./bin/stop-cluster.sh

如果需要在Web界面查看状态,需要修改安装路径下 /conf/flink-conf.yaml 中的相关配置,然后重启集群。

rest:
  # The address to which the REST client will connect to
  address: 172.29.236.202
  bind-address: 0.0.0.0
  # 默认是8081,由于被占用,改成了8082
  port: 8082

如果是单机部署且计划运行的任务并发数大于1,需要调整TaskManager的slot数量。slot可以理解为Flink中的执行单元,一个slot可以执行一个任务子任务。配置时可以根据服务器CPU核心数来设定。

同样修改 /conf/flink-conf.yaml

taskmanager:
  # 默认是1
  numberOfTaskSlots: 16
  memory:
    process:
      size: 8192m # 默认1728m,建议调大内存,不然数据量过大时容易OOM

启动后,可以通过Web界面(地址:http://Flink服务器IP:8082)查看集群状态。

Apache Flink Dashboard 截图,显示运行中的任务列表

2. 依赖驱动准备

使用Flink CDC之前,需要将必要的连接器JAR包放入Flink安装目录的 /lib/ 文件夹下。根据同步任务所需的源端和目标端数据库不同,所需的驱动包也不同。

本次任务需要将Oracle的数据同步到MySQL,需要准备Oracle CDC连接器和JDBC连接器等相关JAR包。具体依赖信息可在Flink CDC官方文档中查看。

Flink CDC 官方文档中关于 Oracle CDC 连接器依赖的说明截图

下图列出了本次任务下载并放置到 /lib/ 目录的部分核心JAR包。

Flink lib 目录下的 JAR 文件列表,包含 Flink CDC、JDBC 连接器及数据库驱动

3. 数据库环境配置

3.1 数据接收端(MySQL)配置

3.1.1 用户权限配置

出于安全考虑,建议为Flink同步任务创建一个专用的数据库账户,并授予最小必要权限。

-- 创建专用账号(仅限来自 Flink 主机的访问),假设安装flink的机器ip:172.29.236.202
CREATE USER 'flink'@'172.29.236.202' IDENTIFIED BY 'YourPassword';

-- 赋权(按需最小权限,JDBC upsert 需要 SELECT/INSERT/UPDATE/DELETE),假设mysql模式名为flow
GRANT SELECT, INSERT, UPDATE, DELETE ON flow.* TO 'flink'@'172.29.236.202';

FLUSH PRIVILEGES;

-- 验证用户创建结果:
SELECT host, user, plugin FROM mysql.user WHERE user IN ('flink','root');

3.1.2 连接器配置

在Flink SQL中,通过以下语句定义指向MySQL目标表的连接器。

CREATE TABLE TEST_FLINK_OBJECT_MYSQL (
  ID STRING NOT NULL,
  BANK_DEPOSIT STRING,
  NUM int,
  NAME STRING,
  PRIMARY KEY (ID) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://172.29.230.54:3306/flow?useSSL=false&characterEncoding=utf8',
  'driver' = 'com.mysql.cj.jdbc.Driver',
  'username' = 'flink',
  'password' = 'YourPassword',
  'table-name' = 'test_flink_object',
  'scan.fetch-size' = ‘500’
);

3.2 数据发送端(Oracle)配置

3.2.1 系统环境准备

以root用户创建必要的目录并设置权限,确保Oracle归档日志可正常写入。

# 创建Oracle用户和组(如果不存在)
sudo adduser oracle
# 创建归档日志目录,确保磁盘空间充足
mkdir -p ‘/u03/oracle/oradata/recovery_area’
chown -R oracle:oinstall ‘/u03/oracle’
chmod 750 ‘/u03/oracle/oradata/recovery_area’

3.2.2 数据库归档配置

使用具有DBA权限的账户登录Oracle,开启归档模式并配置补充日志,这是CDC能够捕获变更的前提。

-- 设置归档路径和大小
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = ‘/u03/oracle/oradata/recovery_area’ scope=spfile;
-- 重启数据库并开启归档模式
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

--查看数据归档模式,应该输出 “Database log mode: Archive Mode”
archive log list;

-- 开启数据库级最小补充日志(必须)
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-- 开启指定表所有列的补充日志(推荐,确保能捕获所有列变更)
ALTER TABLE 模式名.表名 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

3.2.3 补充日志说明

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA 是CDC功能的核心配置,其作用是让Oracle在Redo Log中记录更完整的数据变更信息,包括变更前值、变更后值以及主键信息,以便Flink CDC准确解析。

3.2.4 CDC用户创建

创建一个专用于CDC同步的Oracle用户,并授予必要的权限。

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
-- 下面这个执行可能报错 不影响(Oracle 版本不支持 LOGMINING 系统权限(11g/12c 没有该权限))
GRANT LOGMINING TO flinkuser;
-- 更多权限...

注意:需要被同步的源表必须定义主键,这是Flink CDC进行增量快照和分片读取的关键依据。

4. 同步任务实现

首先,我们在Oracle中准备一张测试表 TEST_FLINK_OBJECT

Oracle 测试表数据预览

数据同步的Flink SQL脚本主要分为三个部分:定义源表连接器、定义目标表连接器、编写插入语句。

4.1 源表连接器配置 (Oracle CDC)

CREATE TABLE TEST_FLINK_OBJECT_ORACLE
   (ID STRING NOT NULL,
    BANK_DEPOSIT STRING,
    NUM int,
    NAME STRING,
    PRIMARY KEY (ID) NOT ENFORCED
   ) WITH (
    ‘connector’ = ‘oracle-cdc’,
    ‘hostname’ = ‘172.31.236.39’,
    ‘port’ = ‘1521’,
    ‘username’ = ‘flinkuser’,
    ‘password’ = ‘flinkpw’,
    ‘database-name’ = ‘cnywdb’,
    ‘schema-name’ = ‘U_CMS’,
    ‘table-name’ = ‘TEST_FLINK_OBJECT’, --映射的真实数据库中的表名
    ‘scan.startup.mode’ = ‘initial’,
    ‘scan.incremental.snapshot.enabled’ = ‘true’,
    -- 这个配置很重要,实践中如果没有显式配置分片键,大数据量时可能OOM
    ‘scan.incremental.snapshot.chunk.key-column’ = ‘ID’,
    ‘debezium.database.connection.adapter’ = ‘logminer’,
    ‘debezium.log.mining.strategy’ = ‘online_catalog’,
    ‘debezium.log.mining.continuous.mine’ = ‘true’
);

4.2 目标表连接器配置 (MySQL JDBC)

CREATE TABLE TEST_FLINK_OBJECT_MYSQL (
  ID STRING NOT NULL,
      BANK_DEPOSIT STRING,
      NUM int,
      NAME STRING,
  PRIMARY KEY (ID) NOT ENFORCED
) WITH (
  ‘connector’ = ‘jdbc’,
  ‘url’ = ‘jdbc:mysql://172.29.230.54:3306/flow?useSSL=false&characterEncoding=utf8’,
  ‘driver’ = ‘com.mysql.cj.jdbc.Driver’,
  ‘username’ = ‘flink’,
  ‘password’ = ‘Syzxzs@2024’,
  ‘table-name’ = ‘test_flink_object’,
  ‘scan.fetch-size’ = ‘500’
);

4.3 数据同步任务

INSERT INTO TEST_FLINK_OBJECT_MYSQL (select * from TEST_FLINK_OBJECT_ORACLE);

4.4 任务提交与执行

启动Flink SQL Client,提交上面编写的任务。切换到Flink安装目录的 /bin/ 下,执行 ./sql-client.sh

Flink SQL Client 启动成功界面

在SQL Client中分别执行创建源表和目标表的SQL语句。

在 Flink SQL Client 中创建 Oracle CDC 源表的执行截图

最后执行数据插入语句,提交同步任务。

提交数据插入作业到 Flink 集群的执行结果

也可以将上述所有SQL语句写在一个文件中(例如 oracle-to-mysql-object.sql),通过命令 ./sql-client.sh -f oracle-to-mysql-object.sql 一次性执行。

执行成功后,可以在MySQL中查询到同步过来的数据。下图展示了在同步时对字段进行了别名映射(将 BANK_DEPOSITNAME 字段互换)后的结果。

MySQL 中同步结果数据预览(字段经过别名映射)

5. 生产实践经验与踩坑记录

5.1 关键配置优化

  • scan.incremental.snapshot.chunk.key-column = ‘ID’:这个配置至关重要。尽管源表有主键,但在实践中发现,必须显式指定此配置来告知Flink CDC使用哪个列进行分片。否则,在大数据量全量同步阶段,Flink可能会使用ROWID等非主键列进行低效分片,导致任务持续读取数据直至内存溢出(OOM)。下图就是未配置该参数时发生OOM的日志截图。

Flink CDC 任务因未正确分片导致内存溢出的错误日志

  • ALTER DATABASE ADD SUPPLEMENTAL LOG DATA:必须开启数据库级别的补充日志。最初我们只开启了表级补充日志,但在配置了主键分片后任务报错,提示需要数据库级日志归档。官方文档也建议同时开启数据库级和表级。

5.2 数据一致性保障

在实践中,我们同步了9张表,总数据量近4千万。发现在初始全量同步及后续增量同步中,偶有极少数据丢失。排查发现丢失的数据本身并无特殊之处,怀疑可能与Oracle 11g版本缺少LOGMINING权限或日志挖掘机制有关。其他数据库(如MySQL、PostgreSQL)的CDC同步则未发现此问题。

建议:在核心数据同步场景中,可以优先采用Flink CDC实现准实时同步。同时,为应对可能存在的极低概率数据不一致问题,应准备一个兜底方案。例如,可以借助AI工具快速生成一个定时对比和补数的脚本,两者结合既能保证实时性,又能确保最终一致性。

参考文档

“作业难写”表情包,用于表达技术实践中的挑战

通过本文的实践指南,你应该能够搭建起一个从Oracle到MySQL的实时数据同步通道。Flink CDC极大地简化了异构数据源同步的复杂度,但在生产部署时,务必关注分片配置、归档日志、内存调优等细节。如果你在实践过程中遇到其他问题,欢迎在云栈社区的大数据板块与大家交流讨论。




上一篇:OpenSpeedy:14k Star开源游戏变速工具实测,解锁帧率上限
下一篇:MUI Base UI:React 无样式组件库的定制指南与实战解析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-24 02:49 , Processed in 0.470677 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表