
在某些业务场景中,我们常常需要记录数据库中数据的变更情况,例如保存操作日志、数据备份或实现缓存同步。一个理想的解决方案是与核心业务代码解耦,避免侵入性改造。本文将详细介绍如何利用 SpringBoot、Canal 和 RabbitMQ 搭建一套稳定、高效的数据库变更监听系统。
核心需求与方案选型
我的核心需求是在 SpringBoot 应用中,以非侵入的方式记录数据变更,内容需包含新数据,对于更新操作还需要能获取变更前的旧数据。
经过技术调研,我发现 Canal 是一个很好的选择,它通过伪装成 MySQL 的 Slave,实时解析数据库的 binlog 日志来捕获数据变动。然而,监听到变更后需要立即进行持久化或其他业务处理,这可能会影响主流程的性能与稳定性。因此,我决定引入 RabbitMQ 消息队列进行异步解耦,让 Canal 将变更事件投递到消息队列,再由独立的消费者进行处理。
实现步骤概览
整个实践流程可以分为以下几个关键步骤:
- 环境准备:使用 Docker 启动 MySQL,并开启 binlog 功能。
- 部署 Canal:启动 Canal 服务,并为其在 MySQL 中创建具有复制权限的账号,使其能以 Slave 身份连接。
- TCP 模式初探:将 Canal 服务模式设为 TCP,编写 Java 客户端直连,验证监听功能。
- 接入消息队列:将 Canal 服务模式改为 RabbitMQ,配置其与 RabbitMQ 的连接,通过消息队列接收变更事件。
- SpringBoot 消费:在
SpringBoot 应用中整合 RabbitMQ,编写消费者处理来自 Canal 的消息。
环境搭建(基于 Docker Compose)
我们首先通过 docker-compose.yml 一键拉起所需的服务:MySQL、RabbitMQ 以及 Canal-Server。
version: "3"
services:
mysql:
network_mode: mynetwork
container_name: mymysql
ports:
- 3306:3306
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/mymysql/data:/data
- /home/mycontainers/mymysql/mysql:/var/lib/mysql
- /home/mycontainers/mymysql/conf:/etc/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
command:
--character-set-server=utf8mb4
--collation-server=utf8mb4_unicode_ci
--log-bin=/var/lib/mysql/mysql-bin
--server-id=1
--binlog-format=ROW
--expire_logs_days=7
--max_binlog_size=500M
image: mysql:5.7.20
rabbitmq:
container_name: myrabbit
ports:
- 15672:15672
- 5672:5672
restart: always
volumes:
- /etc/localtime:/etc/localtime
- /home/mycontainers/myrabbit/rabbitmq:/var/lib/rabbitmq
network_mode: mynetwork
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=123456
image: rabbitmq:3.8-management
canal-server:
container_name: canal-server
restart: always
ports:
- 11110:11110
- 11111:11111
- 11112:11112
volumes:
- /home/mycontainers/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
- /home/mycontainers/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- /home/mycontainers/canal-server/logs:/home/admin/canal-server/logs
network_mode: mynetwork
depends_on:
- mysql
- rabbitmq
# - canal-admin
image: canal/canal-server:v1.1.5
接下来,我们需要重点配置 Canal 的两个核心配置文件 canal.properties 和 instance.properties,并通过 volumes 映射到宿主机对应路径。
配置文件映射说明:
canal.properties:Canal Server 的主配置文件。
instance.properties:Instance(实例)的配置文件。路径中的 example 是实例名,需要与 canal.properties 中的 canal.destinations 配置项对应。一个客户端通常连接一个实例。
以下是配置文件的详细内容:
canal.properties (关键配置节选)
################################################
######## common argument ############
################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
...
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = tcp
...
################################################
######## destinations ############
################################################
canal.destinations = canal-exchange
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
...
#################################################
######## RabbitMQ ############
#################################################
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = admin
rabbitmq.password = 123456
rabbitmq.deliveryMode =
注意:此时 canal.serverMode = tcp,我们先以 TCP 直连模式启动,用手动编写的 Java 客户端进行测试,后续再改为 rabbitMQ 模式。从注释可以看出,Canal 支持 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ 等多种服务模式。
instance.properties (关键配置节选)
################################################
# mysql serverId , v1.0.26+ will autoGen
#canal.instance.mysql.slaveId=123
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=mymysql:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
...
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
...
# mq config
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\..*,.*\..*
canal.mq.partition=0
关键配置修改与MySQL账号准备
我们需要确保 instance.properties 中的数据库连接配置正确:
canal.instance.master.address=mymysql:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
mymysql:3306:因为 MySQL 和 Canal 同处于 Docker 的 mynetwork 网络下,所以直接使用容器名和内部端口 3306 连接。
dbUsername 和 dbPassword:需要我们在 MySQL 中创建对应的用户并授权。
建议专门为 Canal 创建一个用户,而非直接使用 root,执行以下 SQL:
-- 进入docker中的mysql容器
docker exec -it mymysql bash
-- 进入mysql指令模式
mysql -uroot -proot
-- 选择mysql
use mysql;
-- 创建canal用户,账密:canal/canal
create user 'canal'@'%' identified by 'canal';
-- 分配复制所需权限,并允许所有主机登录
grant SELECT, INSERT, UPDATE, DELETE, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';
-- 刷新权限
flush privileges;
-- 附带一个删除用户指令
drop user 'canal'@'%';
创建后,可使用 Navicat 或命令行测试 canal 用户是否能正常登录和访问。
整合SpringBoot:实现Canal TCP客户端
在切换至 RabbitMQ 模式前,我们先通过 TCP 模式验证整个链路是否通畅。在 SpringBoot 项目中引入 Canal 客户端依赖。
Maven 依赖:
<canal.version>1.1.5</canal.version>
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal.version}</version>
</dependency>
编写 Canal 客户端组件:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient {
private final static int BATCH_SIZE = 1000;
public void run() {
// 创建链接,连接到Canal Server
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");
try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
printEntry(message.getEntries());
}
// 进行 batch id 的确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/**
* 打印canal server解析binlog获得的实体类信息
*/
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR # parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
CanalEntry.EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
//判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
//获取RowChange对象里的每一行数据,打印出来
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
在启动类中运行客户端:
@SpringBootApplication
public class BaseApplication implements CommandLineRunner {
@Autowired
private CanalClient canalClient;
@Override
public void run(String... args) throws Exception {
canalClient.run();
}
}
启动应用,此时对监听的 MySQL 数据库进行增删改操作,控制台就会打印出详细的变更信息。不过,直接将处理逻辑放在客户端并不优雅,我们更希望将监听到的事件异步化处理,这就需要引入 RabbitMQ。
Canal 整合 RabbitMQ
接下来,我们将 Canal 的生产者模式从 TCP 切换为 RabbitMQ。
- 修改
canal.properties 中的服务模式:
canal.serverMode = rabbitMQ
- 确保
instance.properties 中的 topic 配置(用于后续的 RabbitMQ routing key):
canal.mq.topic=canal-routing-key
- 核对并完善 RabbitMQ 连接配置(仍在
canal.properties 中):
#################################################
######## RabbitMQ ############
#################################################
# 连接rabbit,写IP或容器名(同一网络下)
rabbitmq.host = myrabbit
rabbitmq.virtual.host = /
# 交换器名称
rabbitmq.exchange = canal-exchange
# 账密
rabbitmq.username = admin
rabbitmq.password = 123456
# 暂不支持指定端口,使用的是默认的5672
- 重启 Canal-Server 容器,使配置生效。
此时,Canal 就会将解析到的 binlog 事件发送到指定的 RabbitMQ 交换器。你可以在 RabbitMQ 的管理界面(http://localhost:15672)手动创建交换器(canal-exchange)和队列(canal-queue),并用 canal-routing-key 进行绑定,以直观地查看消息流动。当然,下一步我们会在 SpringBoot 中自动创建这些组件。
SpringBoot 整合 RabbitMQ 消费消息
现在,我们需要在另一个(或同一个) SpringBoot 应用中,消费来自 Canal 的消息。
引入依赖:
<amqp.version>2.3.4.RELEASE</amqp.version>
<!--消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${amqp.version}</version>
</dependency>
配置 application.yml:
spring:
rabbitmq:
host: 192.168.0.108 # 替换为你的RabbitMQ服务器地址
port: 5672
username: admin
password: 123456
# 消息确认配置项
# 确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
# 确认消息已发送到队列(Queue)
publisher-returns: true
创建 RabbitMQ 配置类(配置序列化方式):
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
/**
* 配置监听器容器工厂,解决消息反序列化问题
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
定义队列、交换器及绑定关系(生产者配置):
@Configuration
public class CanalProvider {
public static final String CanalQueue = "canal-queue";
public static final String CanalExchange = "canal-exchange";
public static final String CanalRouting = "canal-routing-key";
/**
* 队列
*/
@Bean
public Queue canalQueue() {
return new Queue(CanalQueue, true);
}
/**
* 交换机,这里使用直连交换机
*/
@Bean
DirectExchange canalExchange() {
return new DirectExchange(CanalExchange, true, false);
}
/**
* 绑定交换机和队列,并设置匹配键
*/
@Bean
Binding bindingCanal() {
return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(CanalRouting);
}
}
编写消息消费者:
/**
* Canal消息消费者
*/
@Component
@RabbitListener(queues = RabbitConstant.CanalQueue)
public class CanalComsumer {
private final SysBackupService sysBackupService;
public CanalComsumer(SysBackupService sysBackupService) {
this.sysBackupService = sysBackupService;
}
@RabbitHandler
public void process(Map<String, Object> msg) {
System.out.println("收到canal消息:" + msg);
boolean isDdl = (boolean) msg.get("isDdl");
// 不处理DDL事件
if (isDdl) {
return;
}
// TiCDC的id,应该具有唯一性,先保存再说
int tid = (int) msg.get("id");
// TiCDC生成该消息的时间戳,13位毫秒级
long ts = (long) msg.get("ts");
// 数据库
String database = (String) msg.get("database");
// 表
String table = (String) msg.get("table");
// 类型:INSERT/UPDATE/DELETE
String type = (String) msg.get("type");
// 每一列的数据值
List<?> data = (List<?>) msg.get("data");
// 仅当type为UPDATE时才有值,记录每一列的名字和UPDATE之前的数据值
List<?> old = (List<?>) msg.get("old");
// 跳过sys_backup,防止无限循环(如果你的备份表叫这个)
if ("sys_backup".equalsIgnoreCase(table)) {
return;
}
// 只处理指定类型
if (!"INSERT".equalsIgnoreCase(type)
&& !"UPDATE".equalsIgnoreCase(type)
&& !"DELETE".equalsIgnoreCase(type)) {
return;
}
// 在这里实现你的业务逻辑,例如保存变更记录到数据库
// sysBackupService.saveBackup(...);
}
}
测试验证
完成以上所有步骤后,启动你的 SpringBoot 应用。此时,当你对 MySQL 中的数据进行修改,Canal 会捕捉到变更,并通过 RabbitMQ 将消息发送到 canal-queue 队列。你的 SpringBoot 消费者将会收到这条消息,并可以在 process 方法中执行相应的业务逻辑,例如将变更记录持久化到另一个表,实现了与核心业务完全解耦的数据变更监听。
通过本文的实践,我们构建了一套基于 SpringBoot + Canal + RabbitMQ 的数据库变更监听方案。该方案利用了 Canal 对 MySQL binlog 的实时解析能力,以及 RabbitMQ 的异步和解耦特性,确保了系统的稳定性和可扩展性。你可以根据实际业务需求,在消息消费者中扩展更复杂的处理逻辑。如果你在云栈社区(https://yunpan.plus)有更多关于 Java 或 RabbitMQ 的问题,欢迎深入探讨。