找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

394

积分

0

好友

52

主题
发表于 昨天 09:41 | 查看: 6| 回复: 0

canal是阿里巴巴开源的一款基于MySQL数据库增量日志解析的数据同步工具,提供增量数据的订阅和消费能力。本文将通过详细的步骤,展示如何使用canal将MySQL的增量数据同步到Elasticsearch中。

Canal同步MySQL到ES思维导图

1 集群模式

Canal同步架构图

图中的server对应一个canal的运行实例,也就是一个JVM进程。

一个server中可以包含一个或多个instance,我们可以将instance理解为一个独立的数据同步任务

每个instance主要由以下几个核心模块构成:

  • eventParser
    负责数据源接入,通过模拟MySQL Slave协议与Master进行交互,并完成Binlog协议的解析。
  • eventSink
    作为Parser和Store之间的链接器,负责数据的过滤、加工和分发。
  • eventStore
    负责解析后数据的存储。
  • metaManager
    负责管理增量数据的订阅和消费信息。

在实际生产环境中,canal的高可用通常依赖于ZooKeeper。从客户端消费模式来看,可以简单分为 TCP直连模式MQ模式

我们更推荐使用MQ模式进行实战。因为MQ模式的优势在于解耦——canal server将数据变更事件发送到消息队列(如Kafka或RocketMQ),下游的消费者只需消费消息并顺序执行相应的业务逻辑即可,系统架构更加清晰和灵活。

关于顺序消费
在指定的Topic下,所有消息会根据分区键(Sharding Key)被分配到不同的分区中。同一个分区内的消息遵循严格的先进先出(FIFO)原则进行发布和消费。这意味着同一分区内的消息顺序得到保证,而不同分区之间的消息顺序则没有要求。

消息队列分区与顺序消费示意图

2 MySQL配置

  1. 开启Binlog
    对于自建MySQL,首先需要开启Binlog写入功能,并将binlog-format设置为ROW模式。在my.cnf配置文件中添加以下配置:

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replication 需要定义,不要和 canal 的 slaveId 重复

    注意:如果使用的是阿里云RDS for MySQL,默认已开启binlog,并且账号默认拥有binlog dump权限,可以跳过此步骤。

  2. 授权canal账户
    授权用于连接MySQL的canal账户拥有作为MySQL Slave的权限。如果已有账户,可以直接执行GRANT命令。

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
  3. 创建测试表
    创建一个商品表 t_product 用于测试。

    CREATE TABLE `t_product` (
     `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
     `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
     `price` DECIMAL ( 10, 2 ) NOT NULL,
     `status` TINYINT ( 4 ) NOT NULL,
     `create_time` datetime NOT NULL,
     `update_time` datetime NOT NULL,
       PRIMARY KEY ( `id` ) 
    ) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

3 Elasticsearch配置

使用Kibana或直接调用API创建商品索引 t_product

PUT /t_product
{
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    },
    "mappings": {
            "properties": {
               "id": {
                    "type":"keyword"
                },
                "name": {
                    "type":"text"
                },
                "price": {
                    "type":"double"
                },
                "status": {
                    "type":"integer"
                },
                "createTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "updateTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                }
        }
    }
}

执行成功后,会收到类似以下的响应:

Elasticsearch创建索引响应结果

4 RocketMQ 配置

在RocketMQ中创建一个主题(Topic):product-syn-topic。后续canal会将解析到的Binlog数据变更事件发送到这个主题中。

RocketMQ控制台创建主题
RocketMQ主题列表

5 canal 配置

这里我们以canal 1.1.6版本为例,进入其conf目录进行配置。

1、修改全局配置文件 canal.properties

#集群模式 zk地址
canal.zkServers = localhost:2181
#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的组件文件 生产环境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

######  以下部分是默认值 展示出来 
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为 flat json格式对象
canal.mq.flatMessage = true

2、配置实例文件 instance.properties
conf目录下,创建以canal.destinations值命名的目录product-syn,并在该目录下创建配置文件instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...

# table regex 
canal.instance.filter.regex=mytest.t_product

# mq config
canal.mq.topic=product-syn-topic
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

3、启动服务并验证
启动两个canal服务实例以实现高可用。随后,可以通过ZooKeeper的GUI工具(或命令行)查看instance的运行状态。

ZooKeeper中查看Canal实例运行状态

此时,如果你在MySQL的t_product表中修改一条记录,就可以在RocketMQ的控制台中观察到这条变更事件已经被封装成消息发送到了指定的Topic。

RocketMQ控制台中的Canal同步消息详情

6 消费者实现

消费者端需要完成两件事:实现操作ES的索引服务,以及消费MQ消息并调用该服务。

1、产品索引操作服务
这是一个用于向Elasticsearch中添加或删除商品索引的Java服务类。

/**
 * 商品索引管理服务
 */
@Service
public class ProductIndexService {

    private final static Logger logger = LoggerFactory.getLogger(ProductIndexService.class);
    @Autowired
    private ElasticsearchClient elasticsearchClient;

    public void saveProduct(ProductPo product) throws Exception {
        IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
                .index("t_product")
                .id(String.valueOf(product.getId()))
                .document(product).build();

        IndexResponse response = elasticsearchClient.index(indexRequest);
        logger.info("response:" + response);
    }

    public void deleteProduct(Integer id) throws Exception {
        elasticsearchClient.delete(d -> d.index("t_product").id(String.valueOf(id)));
    }
}

商品索引服务代码

2、消息消费监听器
这是一个顺序消费监听器,负责处理从RocketMQ获取的canal消息。

@Configuration
public class RocketMQConfig {
    @Bean
    public DefaultMQPushConsumer createProductConsumer() throws MQClientException {
        DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(PRODUCT_CONSUMER_GROUP);
        pushConsumer.setNamesrvAddr("127.0.0.1:9876");
        pushConsumer.setConsumeMessageBatchMaxSize(1);
        // 注册主题 product-syn-topic
        pushConsumer.subscribe("product-syn-topic", "*");
        // 监听器
        pushConsumer.registerMessageListener(productToESMessageListener);
        pushConsumer.start();
        return pushConsumer;
    }
}

@Component
public class ProductToESMessageListener implements MessageListenerOrderly {
    private final Logger logger = LoggerFactory.getLogger(OrderPointMessageListener.class);
    @Autowired
    private ProductIndexService productIndexService;
    @Override
    public ConsumerOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        try {
            for (MessageExt messageExt : msgs) {
                String data = new String(messageExt.getBody(), "UTF-8");
                logger.info("data:" + data);
                JSONObject json = JSON.parseObject(data);
                String dataNode = json.getString("data");
                String type = json.getString("type");
                List<ProductPo> productPoList = JSON.parseArray(dataNode, ProductPo.class);
                for (ProductPo product : productPoList) {
                    if ("UPDATE".equals(type) || "INSERT".equals(type)) {
                        productIndexService.saveProduct(product);
                    }
                    if("DELETE".equals(type)) {
                        productIndexService.deleteProduct(product.getId());
                    }
                }
            }
        } catch (Exception e) {
            logger.error("consumeMessage error: ", e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
}

RocketMQ消息监听器配置与实现

消费者逻辑的核心在于两点:

  1. 使用顺序消费监听器MessageListenerOrderly),确保同一分区内消息的处理顺序与接收顺序一致,这对于数据同步至关重要。
  2. 将消息体解析为JSON,从data节点中提取表的最新数据(可能是单条或多条),然后根据操作类型(UPDATEINSERTDELETE)调用ProductIndexService中相应的方法来更新Elasticsearch中的索引。

7 总结

canal是一个设计精巧、实用性强的开源项目,许多公司基于它构建了企业级的数据传输服务(Data Transmission Service,简称DTS)。深入研究这个项目,你可以学习到网络编程、多线程模型、高性能队列Disruptor以及流程抽象等多种优秀的编程实践。

本文涉及到的完整示例代码可以在以下GitHub仓库中找到,供读者参考与实践。

https://github.com/makemyownlife/rocketmq4-learning

示例代码GitHub仓库截图

希望这篇实战指南能帮助你理解和搭建基于canal的数据同步链路。如果你想了解更多关于后端架构或数据处理的实战内容,欢迎持续关注云栈社区




上一篇:DDR3主板市场热度回升:旧平台魔改与性价比之选分析
下一篇:GitLab安装部署指南:在Ubuntu/Debian上自托管企业级私有仓库
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-18 16:48 , Processed in 0.220413 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表