之前简单聊过利用binlog4j进行数据库变更监听,但更多是停留在概念层面。今天,我们来一次“干货”级别的实战,分享如何基于这套监听机制,结合Redis的发布订阅功能,构建一个解耦、高性能的数据变更处理架构。
下图清晰地展示了整个处理流程的架构:

核心思路是:binlog4j作为监听客户端捕获MySQL的binlog变更后,通过统一的handler入口进行处理。随后,事件可以根据业务需求选择两种路径:一是通过DataChangeHandlerService进行同步的、直接的业务处理;二是通过DataChangeDispatcher将事件发布到消息中间件(如Redis Pub/Sub),由其他服务异步订阅消费,实现业务解耦。
项目集成与配置
我这次是在一个基于老版本若依(RuoYi)框架的项目中进行集成。受限于框架版本,这里分享的是经过亲测可用的最高版本依赖。
Maven依赖 (pom.xml)
<dependency>
<groupId>com.gitee.Jmysy</groupId>
<artifactId>binlog4j-spring-boot-starter</artifactId>
<version>1.9.0</version>
</dependency>
YAML配置 (application.yml)
spring:
binlog4j:
client-configs:
master: # 客户端名称,可自定义
host: ip
port: 3306
username: 用户名
password: 密码
serverId: 12 # 伪装成从库的 ID,不可与 MySQL server-id 重复
persistence: false # 是否开启宕机续读(需要 Redis 支持)
redis-config: # 如果开启 persistence,需配置 Redis 存储位点
host: ip
port: 6379
password: 密码
database: 0 # 显式加上这个字段试试
注意:binlog4j的配置项是直接放在 spring: 根节点下的。
核心代码实现
1. 数据变更统一监听器 (DataChangeHandler)
这个类是binlog4j事件的统一入口,使用@BinlogSubscriber注解声明,并实现了IBinlogEventHandler接口。它负责将原始的binlog事件封装成统一的DataChangeInfo对象,然后分发给后续的服务。
/**
* 数据变化监听handler
*
* @author zwmac
*/
@Slf4j
@BinlogSubscriber(clientName = "master") // 对应 yml 中的客户端名称
public class DataChangeHandler implements IBinlogEventHandler<Object> {
@Resource
private DataChangeHandlerService dataChangeHandlerService;
@Resource
private DataChangeDispatcher dataChangeDispatcher;
@Override
public void onInsert(BinlogEvent<Object> event) {
Object data = event.getData();
DataChangeInfo<Object> dataChangeInfo = new DataChangeInfo<>();
dataChangeInfo.setOldData(null);
dataChangeInfo.setNewData(data);
dataChangeInfo.setChangeTable(event.getTable());
dataChangeInfo.setChangeType("insert");
dataChangeInfo.setChangeTime(DateUtil.date());
dataChangeHandlerService.dealDataChangeInfo(dataChangeInfo);
dataChangeDispatcher.dispatch(dataChangeInfo);
log.info("监听到插入数据: {}", data);
}
@Override
public void onUpdate(BinlogEvent<Object> event) {
// 更新前的数据
Object oldData = event.getOriginalData();
// 更新后的数据
Object newData = event.getData();
DataChangeInfo<Object> dataChangeInfo = new DataChangeInfo<>();
dataChangeInfo.setOldData(oldData);
dataChangeInfo.setNewData(newData);
dataChangeInfo.setChangeTable(event.getTable());
dataChangeInfo.setChangeType("update");
dataChangeInfo.setChangeTime(DateUtil.date());
dataChangeHandlerService.dealDataChangeInfo(dataChangeInfo);
dataChangeDispatcher.dispatch(dataChangeInfo);
log.info("监听到数据更新,原数据: {}, 新数据: {}", oldData, newData);
}
@Override
public void onDelete(BinlogEvent<Object> event) {
Object data = event.getData();
log.info("监听到删除数据: {}", event.getData());
DataChangeInfo<Object> dataChangeInfo = new DataChangeInfo<>();
dataChangeInfo.setOldData(data);
dataChangeInfo.setNewData(null);
dataChangeInfo.setChangeTable(event.getTable());
dataChangeInfo.setChangeType("delete");
dataChangeInfo.setChangeTime(DateUtil.date());
dataChangeHandlerService.dealDataChangeInfo(dataChangeInfo);
dataChangeDispatcher.dispatch(dataChangeInfo);
}
@Override
public boolean isHandle(String database, String table) {
// 指定监听特定的数据库和表
SysDictConstants.DataChangeTable tableInfo = SysDictConstants.DataChangeTable.getByTableName(table);
if ("库名实例".equals(database) && tableInfo != null) {
return Boolean.TRUE;
}
return Boolean.FALSE;
}
}
2. 数据变更分发器 (DataChangeDispatcher)
这是实现“同步/异步”灵活切换的关键组件。它根据配置决定是直接调用本地处理器(同步),还是将事件发布到Redis频道(异步)。
/**
* 数据变更分发器
*/
@Slf4j
@Component
public class DataChangeDispatcher {
@Resource
private List<TableChangeHandler> tableHandlers;
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Value("${binlog.async.enabled:true}")
private boolean asyncEnabled;
@Value("${binlog.redis.channel:binlog:data:change}")
private String redisChannel;
/**
* 分发数据变更事件
*/
public void dispatch(DataChangeInfo<Object> dataChangeInfo) {
if (dataChangeInfo == null) {
log.warn("dataChangeInfo is null, skip dispatch");
return;
}
String tableName = dataChangeInfo.getChangeTable();
log.info("开始分发数据变更事件: table={}, type={}",
tableName, dataChangeInfo.getChangeType());
// 判断是否异步处理
if (asyncEnabled) {
// Redis发布订阅方式异步处理
publishToRedis(dataChangeInfo);
} else {
// 同步处理
handleSynchronously(dataChangeInfo);
}
}
/**
* 同步处理:直接调用本地处理器
*/
private void handleSynchronously(DataChangeInfo<Object> dataChangeInfo) {
String tableName = dataChangeInfo.getChangeTable();
// 找到对应的处理器
TableChangeHandler handler = findHandler(tableName);
if (handler == null) {
log.warn("未找到表[{}]对应的处理器", tableName);
return;
}
try {
handler.process(dataChangeInfo);
} catch (Exception e) {
log.error("处理表[{}]数据变更异常", tableName, e);
}
}
/**
* 异步处理:发布到Redis
*/
private void publishToRedis(DataChangeInfo<Object> dataChangeInfo) {
try {
// 构建Redis消息
Map<String, Object> message = new HashMap<>();
message.put("table", dataChangeInfo.getChangeTable());
message.put("type", dataChangeInfo.getChangeType());
message.put("data", JSON.toJSONString(dataChangeInfo));
message.put("timestamp", System.currentTimeMillis());
// 发布到Redis频道
redisTemplate.convertAndSend(redisChannel, message);
log.debug("已发布数据变更事件到Redis: channel={}, table={}",
redisChannel, dataChangeInfo.getChangeTable());
} catch (Exception e) {
log.error("发布到Redis失败,降级为同步处理", e);
handleSynchronously(dataChangeInfo);
}
}
/**
* 查找表对应的处理器
*/
private TableChangeHandler findHandler(String tableName) {
if (CollectionUtils.isEmpty(tableHandlers)) {
return null;
}
for (TableChangeHandler handler : tableHandlers) {
if (handler.getSupportedTable().equalsIgnoreCase(tableName)) {
return handler;
}
}
return null;
}
}
3. 表变更处理器接口与抽象类
为了实现业务逻辑的隔离与扩展,我们定义了处理器接口和抽象基类。
处理器接口 (TableChangeHandler)
/**
* @author zwmac
*/
public interface TableChangeHandler {
/**
* 获取支持的表名
*/
String getSupportedTable();
/**
* 处理数据变更
*/
void process(DataChangeInfo<Object> dataChangeInfo);
/**
* 是否支持该变更类型
*/
default boolean supportChangeType(String changeType) {
// 默认支持所有类型
return true;
}
}
抽象处理器 (AbstractTableChangeHandler)
/**
* 表数据变更处理器抽象类
* @author zwmac
*/
@Slf4j
public abstract class AbstractTableChangeHandler implements TableChangeHandler {
@Override
public void process(DataChangeInfo<Object> dataChangeInfo) {
String changeType = dataChangeInfo.getChangeType();
// 检查是否支持该变更类型
if (!supportChangeType(changeType)) {
log.debug("处理器[{}]不支持[{}]类型", getSupportedTable(), changeType);
return;
}
try {
switch (changeType) {
case "insert":
handleInsert(dataChangeInfo);
break;
case "update":
handleUpdate(dataChangeInfo);
break;
case "delete":
handleDelete(dataChangeInfo);
break;
default:
log.warn("未知的变更类型: {}", changeType);
}
} catch (Exception e) {
log.error("处理表[{}]数据变更异常", getSupportedTable(), e);
handleException(dataChangeInfo, e);
}
}
/**
* 处理插入
*/
protected abstract void handleInsert(DataChangeInfo<Object> dataChangeInfo);
/**
* 处理更新
*/
protected abstract void handleUpdate(DataChangeInfo<Object> dataChangeInfo);
/**
* 处理删除
*/
protected abstract void handleDelete(DataChangeInfo<Object> dataChangeInfo);
/**
* 异常处理(可重写)
*/
protected void handleException(DataChangeInfo<Object> dataChangeInfo, Exception e) {
// 默认记录日志,子类可重写实现重试等逻辑
}
}
新增一张表的处理逻辑时,只需继承AbstractTableChangeHandler,实现三个抽象方法,并将其注册为Spring Bean即可,扩展性非常好。
4. 数据载体与同步服务
数据变更信息载体 (DataChangeInfo)
/**
* @author zwmac
*/
@Data
public class DataChangeInfo<T> {
/**
* 数据id,整个系统id就不统一,这里没办法,只能设置成string类型
*/
private String id;
/**
* 数据变更的表名
*/
private String changeTable;
/**
* 数据变更的类型,新增、修改、删除
*/
private String changeType;
/**
* 数据变更的时间
*/
private Date changeTime;
/**
* 变更前数据
*/
private T oldData;
/**
* 变更后数据
*/
private T newData;
}
同步处理服务 (DataChangeHandlerService & Impl)
DataChangeHandlerService作为一个统一的同步处理入口,起到了聚合与初步分发的作用。真正的业务处理逻辑,由各个具体的Service去实现。
/**
* @author zwmac
*/
public interface DataChangeHandlerService {
/**
* 处理数据变更信息(这里相当于一个分发,其实可以发布消息,走订阅消费)
*
* @param dataChangeInfo 数据变更信息
*/
void dealDataChangeInfo(DataChangeInfo<Object> dataChangeInfo);
}
/**
* @author zwmac
*/
@Slf4j
@Service
public class DataChangeHandlerServiceImpl implements DataChangeHandlerService {
@Resource
private ICollegeSchoolMajorService collegeSchoolMajorService;
@Override
public void dealDataChangeInfo(DataChangeInfo<Object> dataChangeInfo) {
if (dataChangeInfo == null) {
log.warn("dataChangeInfo is null");
return;
}
String changeTable = dataChangeInfo.getChangeTable();
SysDictConstants.DataChangeTable tableInfo = SysDictConstants.DataChangeTable.getByTableName(changeTable);
if (tableInfo == null) {
log.warn("未找到对应的表名: {},不做处理", changeTable);
return;
}
//TODO 这里可以配合redis的发布订阅解耦,获取其他消息中间件做解耦,
//只要负责发出事件信息,其他服务消费做数据变更业务处理
switch (tableInfo) {
case T_COLLEGE_SCHOOL_MAJOR:
// 处理表的数据变更
collegeSchoolMajorService.dealCollegeSchoolMajorChange(dataChangeInfo);
log.info("处理school_relative_label表的数据变更: {}", dataChangeInfo);
break;
default:
log.warn("未找到对应的表名: {},不做处理", changeTable);
break;
}
}
}
可以看出,DataChangeHandlerService只是一个统一入口,负责将事件路由到对应的具体业务Service(如collegeSchoolMajorService)进行处理。当然,你也可以选择不通过这个Service,而是让CollegeSchoolMajorHandler直接实现AbstractTableChangeHandler,在里面编写专属的业务逻辑,这样更加解耦。
架构优势总结
这套架构设计具有以下几个显著优点:
1. 高性能
- 异步处理:通过Redis Pub/Sub实现完全异步,不阻塞
binlog监听线程。
- 并行消费:多个消费者可并行处理不同表的数据变更。
- 非阻塞:业务处理失败不影响
binlog监听。
2. 高解耦
- 表与处理器解耦:通过
TableChangeHandler接口实现表与业务逻辑的解耦。
- 生产消费解耦:通过Redis消息队列解耦事件生产和消费。
- 业务逻辑隔离:不同表的处理逻辑完全隔离,互不影响。
3. 强可扩展性
- 新增表处理简单:只需实现
TableChangeHandler接口并注册为Spring Bean。
- 支持多种处理方式:可灵活配置同步、异步或组合使用。
- 灵活的路由策略:可根据表名、变更类型等维度进行灵活路由。
4. 良好的容错性
- 失败降级:Redis发布失败自动降级为同步处理,保证流程不中断。
- 异常隔离:单个表处理异常不影响其他表的正常处理。
- 可重试设计:可在具体的处理器(
handleException)中方便地实现重试机制。
总结回顾
整体流程可以概括为:
- 统一监听:
DataChangeHandler 统一接收所有binlog事件。
- 智能分发:
DataChangeDispatcher 根据配置和表名,智能路由到同步处理或异步消息队列。
- 异步解耦:集成Redis Pub/Sub,实现业务处理的高性能与解耦。
- 易于扩展:新增表处理逻辑只需实现通用接口,符合开闭原则。
- 高可用保障:内置失败降级、异常隔离等容错机制。
这套架构已经在生产环境稳定运行,能够支撑每秒数千次的binlog事件处理,并且代码结构清晰,维护成本和扩展性都表现优异。对于寻求轻量级、高可控性MySQL数据同步方案的中小团队来说,是一个非常值得参考的实践。希望这次在云栈社区的分享能给大家带来一些切实的帮助。