canal是阿里巴巴开源的一款基于MySQL数据库增量日志解析的数据同步工具,提供增量数据的订阅和消费能力。本文将通过详细的步骤,展示如何使用canal将MySQL的增量数据同步到Elasticsearch中。

1 集群模式

图中的server对应一个canal的运行实例,也就是一个JVM进程。
一个server中可以包含一个或多个instance,我们可以将instance理解为一个独立的数据同步任务。
每个instance主要由以下几个核心模块构成:
- eventParser
负责数据源接入,通过模拟MySQL Slave协议与Master进行交互,并完成Binlog协议的解析。
- eventSink
作为Parser和Store之间的链接器,负责数据的过滤、加工和分发。
- eventStore
负责解析后数据的存储。
- metaManager
负责管理增量数据的订阅和消费信息。
在实际生产环境中,canal的高可用通常依赖于ZooKeeper。从客户端消费模式来看,可以简单分为 TCP直连模式 和 MQ模式。
我们更推荐使用MQ模式进行实战。因为MQ模式的优势在于解耦——canal server将数据变更事件发送到消息队列(如Kafka或RocketMQ),下游的消费者只需消费消息并顺序执行相应的业务逻辑即可,系统架构更加清晰和灵活。
关于顺序消费:
在指定的Topic下,所有消息会根据分区键(Sharding Key)被分配到不同的分区中。同一个分区内的消息遵循严格的先进先出(FIFO)原则进行发布和消费。这意味着同一分区内的消息顺序得到保证,而不同分区之间的消息顺序则没有要求。

2 MySQL配置
-
开启Binlog
对于自建MySQL,首先需要开启Binlog写入功能,并将binlog-format设置为ROW模式。在my.cnf配置文件中添加以下配置:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replication 需要定义,不要和 canal 的 slaveId 重复
注意:如果使用的是阿里云RDS for MySQL,默认已开启binlog,并且账号默认拥有binlog dump权限,可以跳过此步骤。
-
授权canal账户
授权用于连接MySQL的canal账户拥有作为MySQL Slave的权限。如果已有账户,可以直接执行GRANT命令。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
-
创建测试表
创建一个商品表 t_product 用于测试。
CREATE TABLE `t_product` (
`id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
`name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
`price` DECIMAL ( 10, 2 ) NOT NULL,
`status` TINYINT ( 4 ) NOT NULL,
`create_time` datetime NOT NULL,
`update_time` datetime NOT NULL,
PRIMARY KEY ( `id` )
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin
3 Elasticsearch配置
使用Kibana或直接调用API创建商品索引 t_product。
PUT /t_product
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": {
"type":"keyword"
},
"name": {
"type":"text"
},
"price": {
"type":"double"
},
"status": {
"type":"integer"
},
"createTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"updateTime": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
}
}
}
}
执行成功后,会收到类似以下的响应:

4 RocketMQ 配置
在RocketMQ中创建一个主题(Topic):product-syn-topic。后续canal会将解析到的Binlog数据变更事件发送到这个主题中。


5 canal 配置
这里我们以canal 1.1.6版本为例,进入其conf目录进行配置。
1、修改全局配置文件 canal.properties
#集群模式 zk地址
canal.zkServers = localhost:2181
#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的组件文件 生产环境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
###### 以下部分是默认值 展示出来
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为 flat json格式对象
canal.mq.flatMessage = true
2、配置实例文件 instance.properties
在conf目录下,创建以canal.destinations值命名的目录product-syn,并在该目录下创建配置文件instance.properties。
# 按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# table regex
canal.instance.filter.regex=mytest.t_product
# mq config
canal.mq.topic=product-syn-topic
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
3、启动服务并验证
启动两个canal服务实例以实现高可用。随后,可以通过ZooKeeper的GUI工具(或命令行)查看instance的运行状态。

此时,如果你在MySQL的t_product表中修改一条记录,就可以在RocketMQ的控制台中观察到这条变更事件已经被封装成消息发送到了指定的Topic。

6 消费者实现
消费者端需要完成两件事:实现操作ES的索引服务,以及消费MQ消息并调用该服务。
1、产品索引操作服务
这是一个用于向Elasticsearch中添加或删除商品索引的Java服务类。
/**
* 商品索引管理服务
*/
@Service
public class ProductIndexService {
private final static Logger logger = LoggerFactory.getLogger(ProductIndexService.class);
@Autowired
private ElasticsearchClient elasticsearchClient;
public void saveProduct(ProductPo product) throws Exception {
IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
.index("t_product")
.id(String.valueOf(product.getId()))
.document(product).build();
IndexResponse response = elasticsearchClient.index(indexRequest);
logger.info("response:" + response);
}
public void deleteProduct(Integer id) throws Exception {
elasticsearchClient.delete(d -> d.index("t_product").id(String.valueOf(id)));
}
}
商品索引服务代码
2、消息消费监听器
这是一个顺序消费监听器,负责处理从RocketMQ获取的canal消息。
@Configuration
public class RocketMQConfig {
@Bean
public DefaultMQPushConsumer createProductConsumer() throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(PRODUCT_CONSUMER_GROUP);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setConsumeMessageBatchMaxSize(1);
// 注册主题 product-syn-topic
pushConsumer.subscribe("product-syn-topic", "*");
// 监听器
pushConsumer.registerMessageListener(productToESMessageListener);
pushConsumer.start();
return pushConsumer;
}
}
@Component
public class ProductToESMessageListener implements MessageListenerOrderly {
private final Logger logger = LoggerFactory.getLogger(OrderPointMessageListener.class);
@Autowired
private ProductIndexService productIndexService;
@Override
public ConsumerOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String data = new String(messageExt.getBody(), "UTF-8");
logger.info("data:" + data);
JSONObject json = JSON.parseObject(data);
String dataNode = json.getString("data");
String type = json.getString("type");
List<ProductPo> productPoList = JSON.parseArray(dataNode, ProductPo.class);
for (ProductPo product : productPoList) {
if ("UPDATE".equals(type) || "INSERT".equals(type)) {
productIndexService.saveProduct(product);
}
if("DELETE".equals(type)) {
productIndexService.deleteProduct(product.getId());
}
}
}
} catch (Exception e) {
logger.error("consumeMessage error: ", e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
RocketMQ消息监听器配置与实现
消费者逻辑的核心在于两点:
- 使用顺序消费监听器(
MessageListenerOrderly),确保同一分区内消息的处理顺序与接收顺序一致,这对于数据同步至关重要。
- 将消息体解析为JSON,从
data节点中提取表的最新数据(可能是单条或多条),然后根据操作类型(UPDATE、INSERT、DELETE)调用ProductIndexService中相应的方法来更新Elasticsearch中的索引。
7 总结
canal是一个设计精巧、实用性强的开源项目,许多公司基于它构建了企业级的数据传输服务(Data Transmission Service,简称DTS)。深入研究这个项目,你可以学习到网络编程、多线程模型、高性能队列Disruptor以及流程抽象等多种优秀的编程实践。
本文涉及到的完整示例代码可以在以下GitHub仓库中找到,供读者参考与实践。
https://github.com/makemyownlife/rocketmq4-learning

希望这篇实战指南能帮助你理解和搭建基于canal的数据同步链路。如果你想了解更多关于后端架构或数据处理的实战内容,欢迎持续关注云栈社区。