一、引言:真正要保证的,不是消息“到队列”,而是业务“最终成立”
在微服务、高并发和事件驱动架构中,消息队列已经是基础设施级组件。绝大多数系统都会用 MQ 做异步解耦、削峰填谷、广播通知、任务分发和跨系统集成。但在生产环境里,真正困难的从来不是“把消息发出去”,而是下面这些问题:
- 订单已经写入数据库,但消息没发出去,怎么办?
- 消息已经到 Broker,但消费者处理一半服务宕机,怎么算?
- 消费者收到同一条消息 3 次,库存会不会被扣成负数?
- 高峰期出现几十万条堆积,是扩容消费者、扩分区,还是限制上游?
- 跨库、跨服务场景下,如何在不引入重型分布式事务的前提下保证最终一致性?
这也是很多团队在做消息可靠性时最容易陷入的误区:把“MQ 是否可靠”误认为“业务是否可靠”。
从架构视角看,消息可靠投递不是单个中间件特性,而是一条端到端链路能力,包括:
- 生产侧是否能保证消息一定被构造、存储、投递
- Broker 是否能保证消息持久化、副本复制、故障恢复
- 消费侧是否能保证处理成功才确认、失败可重试、重复可去重
- 业务侧是否能保证状态机一致、补偿可执行、链路可观测
本文将从一个电商订单系统的主线出发,从失败模型、架构设计到具体的 Spring Boot 代码实现,系统地探讨如何将“可靠投递”从理论升级为可落地的生产级实践。
二、先统一概念:消息可靠性到底在保证什么
2.1 可靠性不是单点能力,而是链路语义
我们通常说“消息不丢”,其实至少包含四层语义:
- 生产可靠性
业务系统决定要发这条消息之后,不会因为进程崩溃、网络抖动、Broker 临时不可用而悄悄丢失。
- 存储可靠性
Broker 收到消息后,不会因为机器宕机、磁盘损坏、主从切换而丢失。
- 消费可靠性
消费者只会在业务成功后确认;业务失败时可重试;重试不会造成副作用放大。
- 业务一致性
即使消息重复、乱序、延迟甚至短暂失败,最终业务状态仍能收敛到正确结果。
2.2 业界真实语义:至少一次,远比“恰好一次”更常见
大部分业务系统最终能稳定落地的,不是严格意义上的“Exactly Once”,而是下面这个组合:
- 消息投递至少一次(At Least Once)
- 业务处理幂等
- 状态变更可重试、可补偿
这是因为“恰好一次”要成立,必须同时约束生产端、Broker、消费端、存储系统和外部依赖,成本极高,而且一旦穿透到数据库、第三方支付、库存系统、短信网关,就很难形成真正数学意义上的 Exactly Once。
所以对绝大多数互联网业务,正确表述应当是:
我们追求的不是消息只来一次,而是消息来多次、来得晚一点、甚至重试很多次,业务结果仍然是正确的。
2.3 消息可靠性的核心三板斧
在工程上,MQ 可靠性几乎都绕不开这三件事:
如果再往前一步到架构层,则需要补上两件事:
三、从失败模型出发:你到底在防什么
很多文章讲可靠投递,只讲功能,不讲故障。可真正的架构设计,必须围绕失败模型展开。
3.1 生产端常见失败
- 应用本地事务提交成功,但发送 MQ 之前进程宕机
- 消息已发送到 Broker,但生产端未收到 ACK,导致不确定是否成功
- 网络超时导致应用重试,从而发出重复消息
- Broker 短暂不可用,发送请求失败
3.2 Broker 侧常见失败
- Broker 收到消息但未落盘即宕机
- 主节点成功写入,但副本未同步完成就故障切换
- 分区 leader 切换导致短时不可写或重复投递
- 队列堆积过大,磁盘水位告警,写入被限流
3.3 消费端常见失败
- 消费者拿到消息,业务处理成功,但 ACK 失败,导致重复消费
- 业务处理一半失败,产生部分副作用
- 下游数据库、缓存、RPC 服务抖动,导致持续重试
- 消费组扩缩容、Rebalance、实例抖动引发消费暂停
3.4 业务层常见失败
- 扣库存成功,但订单状态未更新
- 支付单创建成功,但订单事件没推进
- 同一订单被重复支付、重复扣减、重复发券
- 死信消息没人处理,系统“看起来没报错,实际上静默失败”
3.5 可靠投递的本质
把上面的失败模型归纳起来,消息可靠投递的本质就是一句话:
在任意时刻发生网络异常、进程重启、节点故障、消息重复和顺序扰动时,系统都能最终把业务状态推进到正确结果,且可观测、可恢复、可追溯。
四、真实业务场景:高并发电商订单链路
为了把问题讲透,我们统一采用一个订单场景:
用户在大促期间提交订单,订单服务需要完成以下动作:
- 创建订单主记录
- 冻结库存
- 创建支付单
- 通知风控系统
- 推送履约系统准备发货
- 给营销系统发放积分或优惠券
系统特征如下:
- 峰值下单流量 2 万 QPS
- 订单创建链路需要快速返回,不能同步串行调用所有下游
- 允许短时间最终一致,但不能丢单
- 对库存、支付、优惠券等敏感资源必须防重
- 需要支持故障重试、人工补偿和消息重放
这类场景天然适合用事件驱动架构,但又不能直接“下单后发条 MQ”了事。因为订单入库和事件发布之间一旦断裂,就会造成业务事实与事件事实不一致。
五、架构升级:从“发送消息”升级为“可靠事件驱动”
5.1 推荐架构:事务型 Outbox + 可靠消费者 + 补偿治理
对于大多数以数据库为核心状态源的业务系统,最稳妥、最通用的方案不是强依赖 MQ 的事务能力,而是:
本地事务 + Outbox 表 + 异步投递 + 幂等消费 + 死信补偿
其核心链路如下:
5.2 为什么 Outbox 是生产环境里的高性价比方案
因为它解决了最经典的“双写一致性”问题:
如果这两个动作不在一个原子边界里,就一定存在:
而 Outbox 模式把“业务数据”和“待发送消息”放进同一个数据库事务,先确保事实落库,再由独立投递器异步把消息发到 MQ。这种方式有几个优势:
- 不依赖特定 MQ 的事务语义,迁移性更强
- 容易审计和补偿,消息发送状态可追踪
- 适合绝大多数订单、支付、库存、会员等核心业务
- 对团队能力要求更友好,复杂度可控
5.3 什么时候用 RocketMQ 事务消息
如果团队已经深度使用 RocketMQ,且业务链路天然适合借助 Broker 回查本地事务,那么事务消息也是非常强的方案,尤其适合:
- 跨服务链路长,对消息与事务一致性要求高
- 不希望额外维护 Outbox 扫描器
- 团队对 RocketMQ 运维和事务回查机制足够熟悉
但事务消息并不是“用了就万无一失”,仍然要解决:
- 本地事务执行超时
- 回查状态不明确
- 消费重复
- 下游幂等
所以要强调一点:事务消息解决的是生产侧双写问题,不会自动解决消费侧幂等和业务补偿问题。
六、三种主流方案对比:Outbox、事务消息、纯重试补偿
| 方案 |
原理 |
优点 |
缺点 |
适用场景 |
| 本地消息表 / Outbox |
业务数据与消息记录同库同事务提交,再异步投递 MQ |
最稳、可审计、与 MQ 解耦 |
需要维护扫描/投递器 |
订单、库存、支付、会员核心链路 |
| RocketMQ 事务消息 |
先发半消息,再执行本地事务,Broker 回查事务状态 |
原生支持事务链路 |
依赖 MQ 能力,治理复杂度高 |
强依赖 RocketMQ 的中大型系统 |
| 直接发送 + 补偿重试 |
业务完成后立即发消息,失败靠重试与人工补偿 |
简单,接入成本低 |
双写不一致风险大 |
非核心通知类、可容忍少量人工修复场景 |
结论很明确:
- 核心交易链路优先用 Outbox 或事务消息
- 通知类、日志类、埋点类可简化
- 不要把所有场景都一刀切成最复杂方案
七、主流 MQ 的可靠性能力与选型边界
7.1 RabbitMQ
RabbitMQ 的优势是低延迟、协议灵活、路由能力强,适合:
- 业务指令型消息
- 多路由、多交换机模型
- 中等规模高可靠业务系统
可靠性关键点:
- 持久化队列 + 持久化消息
- Publisher Confirm
- Mandatory + Return Callback
- 手动 ACK
- Quorum Queue 或镜像队列
- 死信交换机和延迟重试队列
注意点:
- 传统镜像队列已不再是新架构首选,生产环境建议优先评估 Quorum Queue
- 大量堆积时吞吐和磁盘压力需要重点关注
7.2 RocketMQ
RocketMQ 天然偏业务消息中台,适合:
可靠性关键点:
- 同步刷盘 / 异步刷盘策略
- 主从复制
- 事务消息回查
- 重试队列与死信队列
- 消费幂等
注意点:
- 事务消息非常强,但不是零成本能力
- 回查逻辑要可复用、可限流、可监控
7.3 Kafka
Kafka 更适合高吞吐事件流和日志流处理,尤其适合:
- 订单事件流
- 行为日志
- 风控埋点
- 流式计算
- CDC 数据集成
可靠性关键点:
acks=all
enable.idempotence=true
- 合理设置副本数和
min.insync.replicas
- 消费位点手动提交
- 分区键设计
注意点:
- Kafka 更擅长“事件流平台”,不一定天然适合所有强业务命令场景
- 业务 Exactly Once 不能简单等同于 Kafka Producer 幂等
7.4 一句话选型建议
- 业务命令、多路由、低延迟:RabbitMQ
- 交易链路、事务消息、顺序消费:RocketMQ
- 高吞吐事件流、日志流、大数据链路:Kafka
如果你的场景是“订单创建后驱动多个下游系统”,三者都能做,但:
- 以业务编排和事务一致为中心,RocketMQ 优势明显
- 以通用稳定和接入成本为中心,RabbitMQ 更常见
- 以事件平台和流处理为中心,Kafka 更合适
八、生产级架构设计:订单系统如何做可靠投递
8.1 逻辑分层
推荐把系统拆成四层:
- 交易写入层
负责接收下单请求、校验业务、写订单和 Outbox。
- 消息投递层
负责从 Outbox 扫描未发送消息,投递 MQ,并更新投递状态。
- 事件消费层
各领域服务消费消息,做幂等、状态推进、失败重试。
- 治理运维层
提供监控、告警、死信、重放、人工补偿、审计追踪能力。
8.2 主题与事件设计
订单系统不要一上来就设计几十个 Topic。更好的方式是先围绕领域事件建模:
order.created
order.paid
order.cancelled
inventory.reserved
inventory.reserve_failed
payment.created
payment.succeeded
事件体建议包含:
eventId:事件唯一标识
eventType:事件类型
aggregateId:聚合根 ID,例如订单 ID
bizKey:业务键,便于排障
occurredAt:业务发生时间
traceId:链路追踪 ID
payload:业务内容
version:事件版本,支持演进
8.3 顺序性设计
并不是所有消息都需要全局有序。生产环境里应当明确:
这意味着:
- Kafka 用
orderId 作为 key,确保同一订单落到同一分区
- RocketMQ 用顺序消息选择器按
orderId 取模
- RabbitMQ 用单订单串行语义时需额外设计路由和消费者模型
8.4 高并发下的关键取舍
要达到高可靠 + 高并发,必须做工程化取舍:
- 核心链路同步步骤尽量少,只保留订单写库和 Outbox 写入
- 非核心动作全部事件化
- 重试机制要带退避和上限,不能无限轰炸下游
- 幂等校验必须落库或落到强一致介质,不能只依赖进程内缓存
- 死信不是终点,要有重放平台和处理 SOP
九、生产级实现一:基于 Outbox 的可靠投递
下面给出一套更接近生产环境的 Spring Boot + JPA 实现骨架。
9.1 表结构设计
订单表
CREATE TABLE t_order (
id BIGSERIAL PRIMARY KEY,
order_id VARCHAR(64) NOT NULL UNIQUE,
user_id BIGINT NOT NULL,
amount NUMERIC(18,2) NOT NULL,
status VARCHAR(32) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
Outbox 表
CREATE TABLE t_outbox_event (
id BIGSERIAL PRIMARY KEY,
event_id VARCHAR(64) NOT NULL UNIQUE,
aggregate_type VARCHAR(64) NOT NULL,
aggregate_id VARCHAR(64) NOT NULL,
event_type VARCHAR(64) NOT NULL,
payload JSONB NOT NULL,
headers JSONB,
status VARCHAR(16) NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
next_retry_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_error TEXT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_outbox_status_retry
ON t_outbox_event(status, next_retry_time, id);
消费幂等表
CREATE TABLE t_message_consume_log (
id BIGSERIAL PRIMARY KEY,
consumer_group VARCHAR(128) NOT NULL,
message_id VARCHAR(64) NOT NULL,
biz_key VARCHAR(64),
status VARCHAR(16) NOT NULL,
consumed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE (consumer_group, message_id)
);
9.2 统一事件模型
public record DomainEvent<T>(
String eventId,
String eventType,
String aggregateType,
String aggregateId,
String bizKey,
Instant occurredAt,
String traceId,
int version,
T payload
) {
public static <T> DomainEvent<T> of(
String eventType,
String aggregateType,
String aggregateId,
String bizKey,
String traceId,
T payload
) {
return new DomainEvent<>(
UUID.randomUUID().toString(),
eventType,
aggregateType,
aggregateId,
bizKey,
Instant.now(),
traceId,
1,
payload
);
}
}
9.3 下单服务:业务数据与 Outbox 同事务提交
@Service
@RequiredArgsConstructor
public class OrderApplicationService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
@Transactional
public String createOrder(CreateOrderCommand command) {
String orderId = UUID.randomUUID().toString();
Order order = Order.create(
orderId,
command.userId(),
command.amount()
);
orderRepository.save(order);
OrderCreatedPayload payload = new OrderCreatedPayload(
orderId,
command.userId(),
command.amount(),
command.items()
);
DomainEvent<OrderCreatedPayload> event = DomainEvent.of(
"order.created",
"Order",
orderId,
orderId,
command.traceId(),
payload
);
OutboxEventEntity outbox = OutboxEventEntity.pending(
event.eventId(),
event.aggregateType(),
event.aggregateId(),
event.eventType(),
toJson(event),
"{\"source\":\"order-service\"}"
);
outboxEventRepository.save(outbox);
return orderId;
}
private String toJson(Object value) {
try {
return objectMapper.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new IllegalStateException("serialize event failed", e);
}
}
}
这一段非常关键,它保证了:
- 订单写入成功时,待发送事件一定存在
- 订单没写入成功时,事件也不会凭空出现
这就把“业务事实”和“消息事实”放进了同一个原子边界。
9.4 Outbox Relay:扫描、投递、确认
生产环境不建议简单用定时任务串行扫表,而应考虑:
- 批量拉取
- 分页/分片
- 状态锁定
- 重试退避
- 多实例并发投递
下面给一个简化但工程可用的实现骨架:
@Service
@RequiredArgsConstructor
public class OutboxRelayJob {
private final OutboxEventRepository outboxEventRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedDelayString = "${outbox.relay.fixed-delay-ms:500}")
public void relay() {
List<OutboxEventEntity> events =
outboxEventRepository.lockTopNForSend(200, LocalDateTime.now());
for (OutboxEventEntity event : events) {
try {
SendResult<String, String> result = kafkaTemplate.send(
"order-domain-event",
event.getAggregateId(),
event.getPayload()
).get(3, TimeUnit.SECONDS);
event.markSent(
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset()
);
outboxEventRepository.save(event);
} catch (Exception ex) {
event.markRetry(nextRetryAt(event.getRetryCount()), rootMessage(ex));
outboxEventRepository.save(event);
}
}
}
private LocalDateTime nextRetryAt(int retryCount) {
long seconds = Math.min(300, (long) Math.pow(2, retryCount));
return LocalDateTime.now().plusSeconds(seconds);
}
private String rootMessage(Throwable ex) {
Throwable current = ex;
while (current.getCause() != null) {
current = current.getCause();
}
return current.getMessage();
}
}
与演示代码相比,这里多了几个生产级要点:
- 只扫描“待发送且到达下次重试时间”的记录
- 每次批量拉取,避免全表扫描
- 发送成功后记录 topic、partition、offset,便于排障
- 失败后写回重试次数和下次执行时间,形成退避机制
9.5 仓储层:避免多实例重复扫描
如果 Outbox Relay 有多个实例,需要避免同一条消息被同时发送。常见做法包括:
SELECT ... FOR UPDATE SKIP LOCKED
- 基于状态机抢占,例如
PENDING -> SENDING
- 按主键范围分片
- 基于一致性哈希做水平切分
例如 PostgreSQL 可用:
SELECT *
FROM t_outbox_event
WHERE status = 'PENDING'
AND next_retry_time <= NOW()
ORDER BY id
LIMIT 200
FOR UPDATE SKIP LOCKED;
9.6 为什么这套方案在高并发下依然可用
因为下单主链路只做两次本地写操作:
两者同库同事务,性能模型清晰,数据库层面容易优化。而真正的 MQ 发送、重试和限流都被移到了异步投递层,这样前台 RT 和成功率会明显更稳。
十、生产级实现二:可靠消费者必须是“幂等 + 手动确认 + 可重试”
消费端是很多系统真正出事故最多的地方。因为只要 ACK 语义和业务语义没对齐,就一定会出现:
10.1 幂等不是 Redis 锁,而是业务去重
很多团队一说幂等就上 Redis setnx 锁,这只能解决“同一时刻并发进入”的一部分问题,解决不了:
- 服务重启后的重复投递
- ACK 失败后的重投
- 延迟重试后的再次到达
真正稳定的方案,是基于消息唯一 ID 或业务唯一键落库去重。
10.2 Kafka 消费者生产级骨架
@Slf4j
@Service
@RequiredArgsConstructor
public class InventoryOrderCreatedConsumer {
private final ConsumeLogRepository consumeLogRepository;
private final InventoryDomainService inventoryDomainService;
private final ObjectMapper objectMapper;
@KafkaListener(
topics = "order-domain-event",
groupId = "inventory-consumer-group",
concurrency = "12",
containerFactory = "kafkaManualAckListenerFactory"
)
@Transactional
public void onMessage(
ConsumerRecord<String, String> record,
Acknowledgment acknowledgment
) throws Exception {
DomainEvent<OrderCreatedPayload> event = objectMapper.readValue(
record.value(),
new TypeReference<>() {}
);
if (!"order.created".equals(event.eventType())) {
acknowledgment.acknowledge();
return;
}
boolean inserted = consumeLogRepository.insertIfAbsent(
"inventory-consumer-group",
event.eventId(),
event.bizKey()
);
if (!inserted) {
log.info("duplicate message ignored, eventId={}", event.eventId());
acknowledgment.acknowledge();
return;
}
try {
inventoryDomainService.reserve(
event.aggregateId(),
event.payload().items()
);
consumeLogRepository.markSuccess(
"inventory-consumer-group",
event.eventId()
);
acknowledgment.acknowledge();
} catch (Throwable ex) {
consumeLogRepository.markFailed(
"inventory-consumer-group",
event.eventId(),
ex.getMessage()
);
throw ex;
}
}
}
这个实现体现了消费端的核心原则:
- 先幂等登记,再执行业务
- 业务成功后再 ACK
- 业务异常时不 ACK,让框架重试或进入错误处理器
10.3 幂等表插入示例
@Repository
@RequiredArgsConstructor
public class ConsumeLogRepository {
private final JdbcTemplate jdbcTemplate;
public boolean insertIfAbsent(String consumerGroup, String messageId, String bizKey) {
String sql = """
INSERT INTO t_message_consume_log(consumer_group, message_id, biz_key, status)
VALUES (?, ?, ?, 'PROCESSING')
ON CONFLICT (consumer_group, message_id) DO NOTHING
""";
return jdbcTemplate.update(sql, consumerGroup, messageId, bizKey) > 0;
}
public void markSuccess(String consumerGroup, String messageId) {
jdbcTemplate.update("""
UPDATE t_message_consume_log
SET status = 'SUCCESS', consumed_at = CURRENT_TIMESTAMP
WHERE consumer_group = ? AND message_id = ?
""", consumerGroup, messageId);
}
public void markFailed(String consumerGroup, String messageId, String error) {
jdbcTemplate.update("""
UPDATE t_message_consume_log
SET status = 'FAILED'
WHERE consumer_group = ? AND message_id = ?
""", consumerGroup, messageId);
}
}
10.4 库存服务中的状态机设计
库存操作不要直接写成“收到消息就减库存”,更稳妥的做法是:
INIT
RESERVED
CONFIRMED
RELEASED
也就是说,订单创建后先冻结库存,支付成功后再确认扣减,订单取消则释放冻结库存。这样即使链路抖动,也更容易做状态收敛和补偿。
十一、生产级实现三:RocketMQ 事务消息如何正确使用
如果项目选用 RocketMQ,事务消息是非常实用的能力。它的核心流程是:
- 生产者发送半消息(对消费者不可见)
- 执行本地事务
- 根据本地事务结果提交或回滚半消息
- 如果 Broker 未收到明确结果,则主动回查生产者
11.1 事务消息生产端示例
@Service
@RequiredArgsConstructor
public class OrderTransactionMessageService {
private final RocketMQTemplate rocketMQTemplate;
public void createOrder(CreateOrderCommand command) {
String orderId = UUID.randomUUID().toString();
OrderCreatedPayload payload = new OrderCreatedPayload(
orderId,
command.userId(),
command.amount(),
command.items()
);
Message<OrderCreatedPayload> message = MessageBuilder
.withPayload(payload)
.setHeader("eventType", "order.created")
.setHeader(RocketMQHeaders.KEYS, orderId)
.build();
rocketMQTemplate.sendMessageInTransaction(
"order-tx-producer-group",
"order-created-topic",
message,
command
);
}
}
11.2 本地事务监听器
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "order-tx-producer-group")
@RequiredArgsConstructor
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
private final OrderRepository orderRepository;
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
CreateOrderCommand command = (CreateOrderCommand) arg;
OrderCreatedPayload payload = (OrderCreatedPayload) msg.getPayload();
Order order = Order.create(
payload.orderId(),
command.userId(),
command.amount()
);
orderRepository.save(order);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception ex) {
log.error("execute local transaction failed", ex);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
OrderCreatedPayload payload = (OrderCreatedPayload) msg.getPayload();
boolean exists = orderRepository.existsByOrderId(payload.orderId());
return exists
? RocketMQLocalTransactionState.COMMIT
: RocketMQLocalTransactionState.ROLLBACK;
}
}
11.3 使用事务消息的几个架构注意点
- 回查接口必须幂等且极轻量,通常只查本地事务结果
- 不要在本地事务里做复杂远程调用
- 事务状态一定要可审计,否则排障成本极高
- 即使使用事务消息,消费端仍要做幂等
十二、RabbitMQ 可靠投递如何落到生产环境
RabbitMQ 适合业务命令型消息,但要想真正可靠,必须把以下配置和编码模式一起用起来。
12.1 生产端关键配置
spring:
rabbitmq:
host: rabbitmq.prod
port: 5672
username: app_user
password: secret
virtual-host: /trade
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
concurrency: 8
max-concurrency: 32
prefetch: 100
12.2 生产者发送示例
@Service
@RequiredArgsConstructor
public class RabbitOrderEventPublisher {
private final RabbitTemplate rabbitTemplate;
public void publish(String exchange, String routingKey, String payload, String eventId) {
CorrelationData correlationData = new CorrelationData(eventId);
rabbitTemplate.convertAndSend(
exchange,
routingKey,
payload,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setMessageId(eventId);
return message;
},
correlationData
);
}
}
12.3 Confirm 与 Return 必须分清楚
- Confirm Callback:Broker 是否收到消息
- Return Callback:消息是否成功路由到队列
很多系统只配 Confirm,不配 Return,结果消息到了交换机却没队列接收,最终仍然等价于丢消息。
12.4 消费端建议
- 手动 ACK
- 失败进入死信,不要无限原地重回队列
- 基于延迟队列做阶梯重试,例如 1 分钟、5 分钟、30 分钟
- 对热点业务控制
prefetch,避免单实例囤积过多未确认消息
十三、Kafka 生产级可靠性参数与消费策略
13.1 Producer 配置建议
spring:
kafka:
bootstrap-servers: kafka-1:9092,kafka-2:9092,kafka-3:9092
producer:
acks: all
retries: 10
batch-size: 65536
buffer-memory: 67108864
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
compression.type: zstd
consumer:
enable-auto-commit: false
auto-offset-reset: latest
max-poll-records: 500
properties:
isolation.level: read_committed
13.2 关键解释
acks=all:只有所有 ISR 副本满足条件后才认为成功
enable.idempotence=true:减少因重试导致的生产端重复
enable-auto-commit=false:位点提交交给业务处理结果控制
read_committed:如果使用事务生产者,消费者只读已提交事务消息
13.3 Kafka 高并发下的重点不是“能发多快”,而是“分区是否够用”
Kafka 的并行上限通常取决于:
- Topic 分区数
- 消费组实例数
- 单条消息处理时长
- 下游数据库/缓存吞吐能力
如果库存服务的消费组有 12 个实例,但 Topic 只有 6 个分区,那么并发上限就是 6,不是 12。
因此容量规划时必须把“业务吞吐目标”转换成“分区数、消费者实例数、单实例处理能力”的组合设计。
十四、高并发工程化升级:不仅要可靠,还要扛流量
14.1 入口削峰:写路径做最小化
高峰期最容易崩的是同步长链路。正确做法是:
- 请求入口快速校验
- 核心交易数据落库
- 事件异步化
- 非核心动作延迟执行
也就是说,在大促下单场景中,真正同步保留在主链路里的,应该只有:
而库存冻结、支付单创建、营销通知、积分发放等都走事件链路。
14.2 消费端扩展:水平扩容不是无限有效
消费者扩容的收益受以下因素影响:
- Topic/Queue/Partition 数量
- 数据库连接池
- 下游接口限流阈值
- 单条消息处理成本
如果一个消息要串行调用 3 个下游 RPC,即使消费者线程再多,也会很快卡在网络和下游资源上。因此消费逻辑的设计原则是:
- 单条消息尽量只做一个核心动作
- 把复杂编排拆成多个事件阶段
- 失败快速返回,交给重试系统
14.3 批量消费与批量落库
对于日志、埋点、风控等非强事务事件,可以批量化以提升吞吐:
- Kafka 批量拉取后批量写入
- RabbitMQ 开启合理 prefetch,减少网络往返
- Outbox 扫描器批量发送
- 幂等表批量插入
但对订单、库存、支付等交易型消息,要谨慎使用过大的批量,否则单批失败的回滚和排障成本很高。
14.4 退避重试要有限制
一个成熟的重试策略通常包含:
- 最大重试次数
- 指数退避
- 抖动因子
- 可重试错误白名单
- 不可重试错误直接死信
例如:
- 库存服务超时:重试
- 参数非法:不重试
- 商品已下架:业务拒绝,直接结束
- 数据库主从切换瞬时失败:短暂重试
14.5 限流、熔断与隔离
如果消费端下游依赖不稳定,继续高速消费只会扩大事故面。生产环境里建议:
- 对下游 RPC 配置限流与熔断
- 区分核心 Topic 与普通 Topic
- 对慢消费者单独隔离消费组
- 对 Outbox Relay 配置发送速率上限
十五、生产级监控与治理:没有观测,就没有可靠性
一个系统哪怕设计再好,如果没有观测能力,也无法证明它可靠。
15.1 必须监控的核心指标
生产侧
- 消息发送成功率
- Broker ACK 延迟
- Producer 重试次数
- Outbox 待发送数量
- Outbox 最老未发送消息年龄
Broker 侧
- Topic/Queue 堆积量
- 分区 leader 切换次数
- ISR 变化
- 磁盘使用率
- 网络吞吐
消费侧
- 消费成功率
- 单条消息处理耗时
- 重试次数
- 死信数量
- 消费延迟
- Rebalance 次数
业务侧
- 订单创建成功但未冻结库存数
- 支付成功但订单状态未推进数
- 死信积压超时数
- 人工补偿完成率
15.2 推荐告警项
| 告警项 |
建议阈值 |
说明 |
| Outbox 未发送积压 |
超过 5 分钟持续增长 |
说明投递层异常 |
| Topic 消费延迟 |
超过 SLA |
说明下游处理能力不足 |
| 死信速率突增 |
环比激增 |
说明代码或依赖异常 |
| 重试率 > 5% |
持续 10 分钟 |
说明系统进入不稳定状态 |
| 消费失败率 > 1% |
核心链路立即告警 |
交易链路必须高敏感 |
15.3 可观测性链路建议
每条消息都建议带上:
traceId
eventId
aggregateId
producerApp
consumerGroup
这样才能实现:
- 从一次投诉订单反查整条事件链路
- 从一条死信追溯生产端、Broker、消费端全过程
- 从链路追踪系统定位哪个环节 RT 飙升
十六、灾备与恢复:可靠性不是“不出错”,而是“出错也能恢复”
16.1 死信队列不是垃圾桶
很多团队把 DLQ 配出来就算完成,其实这只做了 30%。真正完整的死信治理要包含:
- 死信分类
- 根因标签
- 自动重放规则
- 人工审核入口
- 操作审计日志
16.2 建议建设“消息补偿平台”
平台至少要支持:
- 按订单号、事件 ID 查询消息轨迹
- 查看生产状态、消费状态、重试次数、失败原因
- 手动重发
- 重发前修改目标 Topic/Queue 或延迟级别
- 批量恢复某一时间窗口的异常消息
16.3 容灾演练必须常态化
至少要演练这几类故障:
- Broker 单节点宕机
- 分区 leader 切换
- 消费者批量重启
- 数据库闪断
- 下游库存服务不可用
- Outbox Relay 长时间停止
演练目标不是“系统没报错”,而是确认:
- 数据是否最终收敛
- 告警是否及时触发
- 补偿是否可执行
- RTO/RPO 是否满足要求
十七、常见误区:很多系统不是败给 MQ,而是败给认知偏差
17.1 误区一:Broker 持久化了,就等于业务可靠
错。Broker 持久化只解决“消息存住了”,不解决:
- 数据与消息双写一致性
- 消费端重复执行
- 业务状态不一致
17.2 误区二:用了 Redis 锁,就等于幂等
错。Redis 锁解决的是并发竞争,不等于持久化幂等语义。真正可靠的幂等必须基于业务唯一键、消息唯一键和可恢复的存储介质。
17.3 误区三:无限重试一定能成功
错。无限重试经常会把局部故障放大成系统雪崩。正确做法是:
17.4 误区四:所有消息都要严格顺序
错。全局顺序的代价极高,大多数业务只需要“同一业务键局部有序”。
17.5 误区五:Exactly Once 是默认目标
错。对大多数业务系统,更现实、更稳妥的目标是:
至少一次投递 + 幂等消费 + 状态收敛 + 可补偿
十八、案例落地:一条订单消息如何在生产环境闭环
下面用完整链路把前面的设计串起来。
18.1 场景
用户下单成功后,需要完成:
18.2 处理流程
- 订单服务收到请求,校验参数和风控规则。
- 在同一个数据库事务里写入
t_order 和 t_outbox_event。
- Outbox Relay 扫描到
order.created 事件,投递到 order-domain-event Topic。
- 库存服务消费该事件,先插入消费幂等表,成功后冻结库存,再 ACK。
- 支付服务消费同一事件,创建支付单并 ACK。
- 营销服务消费后调用发券逻辑,若发券系统超时则进入延迟重试。
- 某个服务重试超过阈值则进入死信队列,并触发告警。
- 运维或业务通过补偿平台查询事件轨迹,确认根因后执行重放。
18.3 为什么这套流程能抗住高并发与故障
- 下单主链路极短,只做本地事务
- 所有异步步骤都支持失败重试
- 消费重复不会重复扣库存或重复建支付单
- 每个节点都有观测点
- 出现问题时不是“数据黑洞”,而是“可恢复状态”
十九、文章级总结:一套可执行的设计准则
如果你要在团队里推动消息可靠投递体系建设,可以直接按下面这套准则落地:
19.1 核心设计准则
- 核心交易链路优先解决“双写一致性”,推荐 Outbox 或事务消息。
- 消费端默认按 At Least Once 设计,靠幂等保证业务正确。
- ACK 必须晚于业务成功,不能“先确认后处理”。
- 重试必须有边界,失败最终进入死信和补偿流程。
- 顺序性只保证到业务键级别,不追求全局顺序。
- 可靠性必须可观测、可审计、可重放。
19.2 技术选型准则
- 交易业务强一致诉求高:优先 RocketMQ 事务消息或 Outbox
- 通用业务消息、多路由能力强:RabbitMQ
- 高吞吐事件流、大数据链路:Kafka
19.3 工程实施准则
- 数据库表结构先设计好状态字段、重试字段、索引
- 统一事件模型、统一 traceId、统一幂等方案
- 建立 DLQ 和补偿平台,而不是只依赖日志排障
- 把堆积、延迟、失败率和死信纳入 SLO 指标
二十、结语:可靠投递不是中间件特性,而是系统工程能力
真正成熟的消息可靠性体系,从来不是“我会配 RabbitMQ Confirm”或者“我会用 Kafka acks=all”这么简单。
它本质上是一个系统工程问题,涉及:
- 分布式事务边界设计
- 状态机建模
- 幂等语义设计
- 高并发吞吐优化
- 死信补偿治理
- 监控审计与故障演练
如果只盯着 MQ 参数,很容易得到一个“看起来可靠”的系统;如果从业务事实、失败模型和恢复能力出发,才能真正建设出一个“即使出错也能收敛”的系统。
最后给一个可以直接带走的结论:
在绝大多数互联网业务里,消息可靠投递的最优解不是追求神话般的 Exactly Once,而是用本地事务或事务消息保证生产侧一致性,用幂等和状态机保证消费侧正确性,用重试、死信和补偿保证系统最终收敛。
这,才是消息队列可靠投递从理论走向生产实践的完整答案。你可以在 云栈社区 找到更多关于分布式系统和架构设计的深度讨论与实战案例。