找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2266

积分

0

好友

304

主题
发表于 1 小时前 | 查看: 3| 回复: 0

之前简单聊过利用binlog4j进行数据库变更监听,但更多是停留在概念层面。今天,我们来一次“干货”级别的实战,分享如何基于这套监听机制,结合Redis的发布订阅功能,构建一个解耦、高性能的数据变更处理架构。

下图清晰地展示了整个处理流程的架构:

binlog4j数据监听异步处理架构图

核心思路是:binlog4j作为监听客户端捕获MySQL的binlog变更后,通过统一的handler入口进行处理。随后,事件可以根据业务需求选择两种路径:一是通过DataChangeHandlerService进行同步的、直接的业务处理;二是通过DataChangeDispatcher将事件发布到消息中间件(如Redis Pub/Sub),由其他服务异步订阅消费,实现业务解耦。

项目集成与配置

我这次是在一个基于老版本若依(RuoYi)框架的项目中进行集成。受限于框架版本,这里分享的是经过亲测可用的最高版本依赖。

Maven依赖 (pom.xml)

<dependency>
    <groupId>com.gitee.Jmysy</groupId>
    <artifactId>binlog4j-spring-boot-starter</artifactId>
    <version>1.9.0</version>
</dependency>

YAML配置 (application.yml)

spring:
  binlog4j:
    client-configs:
      master:  # 客户端名称,可自定义
        host: ip
        port: 3306
        username: 用户名
        password: 密码
        serverId: 12 # 伪装成从库的 ID,不可与 MySQL server-id 重复
        persistence: false # 是否开启宕机续读(需要 Redis 支持)
    redis-config: # 如果开启 persistence,需配置 Redis 存储位点
      host: ip
      port: 6379
      password: 密码
      database: 0  # 显式加上这个字段试试

注意binlog4j的配置项是直接放在 spring: 根节点下的。

核心代码实现

1. 数据变更统一监听器 (DataChangeHandler)

这个类是binlog4j事件的统一入口,使用@BinlogSubscriber注解声明,并实现了IBinlogEventHandler接口。它负责将原始的binlog事件封装成统一的DataChangeInfo对象,然后分发给后续的服务。

/**
 * 数据变化监听handler
 *
 * @author zwmac
 */
@Slf4j
@BinlogSubscriber(clientName = "master") // 对应 yml 中的客户端名称
public class DataChangeHandler implements IBinlogEventHandler<Object> {

    @Resource
    private DataChangeHandlerService dataChangeHandlerService;

    @Resource
    private DataChangeDispatcher dataChangeDispatcher;

    @Override
    public void onInsert(BinlogEvent<Object> event) {
        Object data = event.getData();
        DataChangeInfo<Object> dataChangeInfo = new DataChangeInfo<>();
        dataChangeInfo.setOldData(null);
        dataChangeInfo.setNewData(data);
        dataChangeInfo.setChangeTable(event.getTable());
        dataChangeInfo.setChangeType("insert");
        dataChangeInfo.setChangeTime(DateUtil.date());
        dataChangeHandlerService.dealDataChangeInfo(dataChangeInfo);
        dataChangeDispatcher.dispatch(dataChangeInfo);
        log.info("监听到插入数据: {}", data);
    }

    @Override
    public void onUpdate(BinlogEvent<Object> event) {
        // 更新前的数据
        Object oldData = event.getOriginalData();
        // 更新后的数据
        Object newData = event.getData();
        DataChangeInfo<Object> dataChangeInfo = new DataChangeInfo<>();
        dataChangeInfo.setOldData(oldData);
        dataChangeInfo.setNewData(newData);
        dataChangeInfo.setChangeTable(event.getTable());
        dataChangeInfo.setChangeType("update");
        dataChangeInfo.setChangeTime(DateUtil.date());
        dataChangeHandlerService.dealDataChangeInfo(dataChangeInfo);
        dataChangeDispatcher.dispatch(dataChangeInfo);
        log.info("监听到数据更新,原数据: {}, 新数据: {}", oldData, newData);
    }

    @Override
    public void onDelete(BinlogEvent<Object> event) {
        Object data = event.getData();
        log.info("监听到删除数据: {}", event.getData());
        DataChangeInfo<Object> dataChangeInfo = new DataChangeInfo<>();
        dataChangeInfo.setOldData(data);
        dataChangeInfo.setNewData(null);
        dataChangeInfo.setChangeTable(event.getTable());
        dataChangeInfo.setChangeType("delete");
        dataChangeInfo.setChangeTime(DateUtil.date());
        dataChangeHandlerService.dealDataChangeInfo(dataChangeInfo);
        dataChangeDispatcher.dispatch(dataChangeInfo);
    }

    @Override
    public boolean isHandle(String database, String table) {
        // 指定监听特定的数据库和表
        SysDictConstants.DataChangeTable tableInfo = SysDictConstants.DataChangeTable.getByTableName(table);
        if ("库名实例".equals(database) && tableInfo != null) {
            return Boolean.TRUE;
        }
        return Boolean.FALSE;
    }
}

2. 数据变更分发器 (DataChangeDispatcher)

这是实现“同步/异步”灵活切换的关键组件。它根据配置决定是直接调用本地处理器(同步),还是将事件发布到Redis频道(异步)。

/**
 * 数据变更分发器
 */
@Slf4j
@Component
public class DataChangeDispatcher {

    @Resource
    private List<TableChangeHandler> tableHandlers;

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Value("${binlog.async.enabled:true}")
    private boolean asyncEnabled;

    @Value("${binlog.redis.channel:binlog:data:change}")
    private String redisChannel;

    /**
     * 分发数据变更事件
     */
    public void dispatch(DataChangeInfo<Object> dataChangeInfo) {
        if (dataChangeInfo == null) {
            log.warn("dataChangeInfo is null, skip dispatch");
            return;
        }

        String tableName = dataChangeInfo.getChangeTable();
        log.info("开始分发数据变更事件: table={}, type={}",
            tableName, dataChangeInfo.getChangeType());

        // 判断是否异步处理
        if (asyncEnabled) {
            // Redis发布订阅方式异步处理
            publishToRedis(dataChangeInfo);
        } else {
            // 同步处理
            handleSynchronously(dataChangeInfo);
        }
    }

    /**
     * 同步处理:直接调用本地处理器
     */
    private void handleSynchronously(DataChangeInfo<Object> dataChangeInfo) {
        String tableName = dataChangeInfo.getChangeTable();

        // 找到对应的处理器
        TableChangeHandler handler = findHandler(tableName);
        if (handler == null) {
            log.warn("未找到表[{}]对应的处理器", tableName);
            return;
        }

        try {
            handler.process(dataChangeInfo);
        } catch (Exception e) {
            log.error("处理表[{}]数据变更异常", tableName, e);
        }
    }

    /**
     * 异步处理:发布到Redis
     */
    private void publishToRedis(DataChangeInfo<Object> dataChangeInfo) {
        try {
            // 构建Redis消息
            Map<String, Object> message = new HashMap<>();
            message.put("table", dataChangeInfo.getChangeTable());
            message.put("type", dataChangeInfo.getChangeType());
            message.put("data", JSON.toJSONString(dataChangeInfo));
            message.put("timestamp", System.currentTimeMillis());

            // 发布到Redis频道
            redisTemplate.convertAndSend(redisChannel, message);
            log.debug("已发布数据变更事件到Redis: channel={}, table={}",
                redisChannel, dataChangeInfo.getChangeTable());
        } catch (Exception e) {
            log.error("发布到Redis失败,降级为同步处理", e);
            handleSynchronously(dataChangeInfo);
        }
    }

    /**
     * 查找表对应的处理器
     */
    private TableChangeHandler findHandler(String tableName) {
        if (CollectionUtils.isEmpty(tableHandlers)) {
            return null;
        }

        for (TableChangeHandler handler : tableHandlers) {
            if (handler.getSupportedTable().equalsIgnoreCase(tableName)) {
                return handler;
            }
        }
        return null;
    }
}

3. 表变更处理器接口与抽象类

为了实现业务逻辑的隔离与扩展,我们定义了处理器接口和抽象基类。

处理器接口 (TableChangeHandler)

/**
 * @author zwmac
 */
public interface TableChangeHandler {
    /**
     * 获取支持的表名
     */
    String getSupportedTable();

    /**
     * 处理数据变更
     */
    void process(DataChangeInfo<Object> dataChangeInfo);

    /**
     * 是否支持该变更类型
     */
    default boolean supportChangeType(String changeType) {
        // 默认支持所有类型
        return true;
    }
}

抽象处理器 (AbstractTableChangeHandler)

/**
 * 表数据变更处理器抽象类
 * @author zwmac
 */
@Slf4j
public abstract class AbstractTableChangeHandler implements TableChangeHandler {

    @Override
    public void process(DataChangeInfo<Object> dataChangeInfo) {
        String changeType = dataChangeInfo.getChangeType();

        // 检查是否支持该变更类型
        if (!supportChangeType(changeType)) {
            log.debug("处理器[{}]不支持[{}]类型", getSupportedTable(), changeType);
            return;
        }

        try {
            switch (changeType) {
                case "insert":
                    handleInsert(dataChangeInfo);
                    break;
                case "update":
                    handleUpdate(dataChangeInfo);
                    break;
                case "delete":
                    handleDelete(dataChangeInfo);
                    break;
                default:
                    log.warn("未知的变更类型: {}", changeType);
            }
        } catch (Exception e) {
            log.error("处理表[{}]数据变更异常", getSupportedTable(), e);
            handleException(dataChangeInfo, e);
        }
    }

    /**
     * 处理插入
     */
    protected abstract void handleInsert(DataChangeInfo<Object> dataChangeInfo);

    /**
     * 处理更新
     */
    protected abstract void handleUpdate(DataChangeInfo<Object> dataChangeInfo);

    /**
     * 处理删除
     */
    protected abstract void handleDelete(DataChangeInfo<Object> dataChangeInfo);

    /**
     * 异常处理(可重写)
     */
    protected void handleException(DataChangeInfo<Object> dataChangeInfo, Exception e) {
        // 默认记录日志,子类可重写实现重试等逻辑
    }
}

新增一张表的处理逻辑时,只需继承AbstractTableChangeHandler,实现三个抽象方法,并将其注册为Spring Bean即可,扩展性非常好。

4. 数据载体与同步服务

数据变更信息载体 (DataChangeInfo)

/**
 * @author zwmac
 */
@Data
public class DataChangeInfo<T> {
    /**
     * 数据id,整个系统id就不统一,这里没办法,只能设置成string类型
     */
    private String id;
    /**
     * 数据变更的表名
     */
    private String changeTable;
    /**
     * 数据变更的类型,新增、修改、删除
     */
    private String changeType;

    /**
     * 数据变更的时间
     */
    private Date changeTime;
    /**
     * 变更前数据
     */
    private T oldData;
    /**
     * 变更后数据
     */
    private T newData;
}

同步处理服务 (DataChangeHandlerService & Impl)
DataChangeHandlerService作为一个统一的同步处理入口,起到了聚合与初步分发的作用。真正的业务处理逻辑,由各个具体的Service去实现。

/**
 * @author zwmac
 */
public interface DataChangeHandlerService {
    /**
     * 处理数据变更信息(这里相当于一个分发,其实可以发布消息,走订阅消费)
     *
     * @param dataChangeInfo 数据变更信息
     */
    void dealDataChangeInfo(DataChangeInfo<Object> dataChangeInfo);
}
/**
 * @author zwmac
 */
@Slf4j
@Service
public class DataChangeHandlerServiceImpl implements DataChangeHandlerService {

    @Resource
    private ICollegeSchoolMajorService collegeSchoolMajorService;

    @Override
    public void dealDataChangeInfo(DataChangeInfo<Object> dataChangeInfo) {
        if (dataChangeInfo == null) {
            log.warn("dataChangeInfo is null");
            return;
        }
        String changeTable = dataChangeInfo.getChangeTable();
        SysDictConstants.DataChangeTable tableInfo = SysDictConstants.DataChangeTable.getByTableName(changeTable);
        if (tableInfo == null) {
            log.warn("未找到对应的表名: {},不做处理", changeTable);
            return;
        }
        //TODO 这里可以配合redis的发布订阅解耦,获取其他消息中间件做解耦,
        //只要负责发出事件信息,其他服务消费做数据变更业务处理
        switch (tableInfo) {
            case T_COLLEGE_SCHOOL_MAJOR:
                // 处理表的数据变更
                collegeSchoolMajorService.dealCollegeSchoolMajorChange(dataChangeInfo);
                log.info("处理school_relative_label表的数据变更: {}", dataChangeInfo);
                break;
            default:
                log.warn("未找到对应的表名: {},不做处理", changeTable);
                break;
        }
    }
}

可以看出,DataChangeHandlerService只是一个统一入口,负责将事件路由到对应的具体业务Service(如collegeSchoolMajorService)进行处理。当然,你也可以选择不通过这个Service,而是让CollegeSchoolMajorHandler直接实现AbstractTableChangeHandler,在里面编写专属的业务逻辑,这样更加解耦。

架构优势总结

这套架构设计具有以下几个显著优点:

1. 高性能

  • 异步处理:通过Redis Pub/Sub实现完全异步,不阻塞binlog监听线程。
  • 并行消费:多个消费者可并行处理不同表的数据变更。
  • 非阻塞:业务处理失败不影响binlog监听。

2. 高解耦

  • 表与处理器解耦:通过TableChangeHandler接口实现表与业务逻辑的解耦。
  • 生产消费解耦:通过Redis消息队列解耦事件生产和消费。
  • 业务逻辑隔离:不同表的处理逻辑完全隔离,互不影响。

3. 强可扩展性

  • 新增表处理简单:只需实现TableChangeHandler接口并注册为Spring Bean。
  • 支持多种处理方式:可灵活配置同步、异步或组合使用。
  • 灵活的路由策略:可根据表名、变更类型等维度进行灵活路由。

4. 良好的容错性

  • 失败降级:Redis发布失败自动降级为同步处理,保证流程不中断。
  • 异常隔离:单个表处理异常不影响其他表的正常处理。
  • 可重试设计:可在具体的处理器(handleException)中方便地实现重试机制。

总结回顾

整体流程可以概括为:

  1. 统一监听DataChangeHandler 统一接收所有binlog事件。
  2. 智能分发DataChangeDispatcher 根据配置和表名,智能路由到同步处理或异步消息队列。
  3. 异步解耦:集成Redis Pub/Sub,实现业务处理的高性能与解耦。
  4. 易于扩展:新增表处理逻辑只需实现通用接口,符合开闭原则。
  5. 高可用保障:内置失败降级、异常隔离等容错机制。

这套架构已经在生产环境稳定运行,能够支撑每秒数千次的binlog事件处理,并且代码结构清晰,维护成本和扩展性都表现优异。对于寻求轻量级、高可控性MySQL数据同步方案的中小团队来说,是一个非常值得参考的实践。希望这次在云栈社区的分享能给大家带来一些切实的帮助。




上一篇:从P9高管吐槽帖看技术管理者困境:沟通障碍与团队效能反思
下一篇:Ghostty 1.3 发布:终端搜索、快捷键系统增强与性能优化
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-12 06:59 , Processed in 0.538783 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表