找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3041

积分

0

好友

411

主题
发表于 前天 03:05 | 查看: 9| 回复: 0

一、引言:真正要保证的,不是消息“到队列”,而是业务“最终成立”

在微服务、高并发和事件驱动架构中,消息队列已经是基础设施级组件。绝大多数系统都会用 MQ 做异步解耦、削峰填谷、广播通知、任务分发和跨系统集成。但在生产环境里,真正困难的从来不是“把消息发出去”,而是下面这些问题:

  • 订单已经写入数据库,但消息没发出去,怎么办?
  • 消息已经到 Broker,但消费者处理一半服务宕机,怎么算?
  • 消费者收到同一条消息 3 次,库存会不会被扣成负数?
  • 高峰期出现几十万条堆积,是扩容消费者、扩分区,还是限制上游?
  • 跨库、跨服务场景下,如何在不引入重型分布式事务的前提下保证最终一致性?

这也是很多团队在做消息可靠性时最容易陷入的误区:把“MQ 是否可靠”误认为“业务是否可靠”

从架构视角看,消息可靠投递不是单个中间件特性,而是一条端到端链路能力,包括:

  • 生产侧是否能保证消息一定被构造、存储、投递
  • Broker 是否能保证消息持久化、副本复制、故障恢复
  • 消费侧是否能保证处理成功才确认、失败可重试、重复可去重
  • 业务侧是否能保证状态机一致、补偿可执行、链路可观测

本文将从一个电商订单系统的主线出发,从失败模型、架构设计到具体的 Spring Boot 代码实现,系统地探讨如何将“可靠投递”从理论升级为可落地的生产级实践。

二、先统一概念:消息可靠性到底在保证什么

2.1 可靠性不是单点能力,而是链路语义

我们通常说“消息不丢”,其实至少包含四层语义:

  1. 生产可靠性
    业务系统决定要发这条消息之后,不会因为进程崩溃、网络抖动、Broker 临时不可用而悄悄丢失。
  2. 存储可靠性
    Broker 收到消息后,不会因为机器宕机、磁盘损坏、主从切换而丢失。
  3. 消费可靠性
    消费者只会在业务成功后确认;业务失败时可重试;重试不会造成副作用放大。
  4. 业务一致性
    即使消息重复、乱序、延迟甚至短暂失败,最终业务状态仍能收敛到正确结果。

2.2 业界真实语义:至少一次,远比“恰好一次”更常见

大部分业务系统最终能稳定落地的,不是严格意义上的“Exactly Once”,而是下面这个组合:

  • 消息投递至少一次(At Least Once)
  • 业务处理幂等
  • 状态变更可重试、可补偿

这是因为“恰好一次”要成立,必须同时约束生产端、Broker、消费端、存储系统和外部依赖,成本极高,而且一旦穿透到数据库、第三方支付、库存系统、短信网关,就很难形成真正数学意义上的 Exactly Once。

所以对绝大多数互联网业务,正确表述应当是:

我们追求的不是消息只来一次,而是消息来多次、来得晚一点、甚至重试很多次,业务结果仍然是正确的。

2.3 消息可靠性的核心三板斧

在工程上,MQ 可靠性几乎都绕不开这三件事:

  • 持久化
  • 确认机制
  • 幂等处理

如果再往前一步到架构层,则需要补上两件事:

  • 事务衔接能力
  • 失败补偿与治理能力

三、从失败模型出发:你到底在防什么

很多文章讲可靠投递,只讲功能,不讲故障。可真正的架构设计,必须围绕失败模型展开。

3.1 生产端常见失败

  • 应用本地事务提交成功,但发送 MQ 之前进程宕机
  • 消息已发送到 Broker,但生产端未收到 ACK,导致不确定是否成功
  • 网络超时导致应用重试,从而发出重复消息
  • Broker 短暂不可用,发送请求失败

3.2 Broker 侧常见失败

  • Broker 收到消息但未落盘即宕机
  • 主节点成功写入,但副本未同步完成就故障切换
  • 分区 leader 切换导致短时不可写或重复投递
  • 队列堆积过大,磁盘水位告警,写入被限流

3.3 消费端常见失败

  • 消费者拿到消息,业务处理成功,但 ACK 失败,导致重复消费
  • 业务处理一半失败,产生部分副作用
  • 下游数据库、缓存、RPC 服务抖动,导致持续重试
  • 消费组扩缩容、Rebalance、实例抖动引发消费暂停

3.4 业务层常见失败

  • 扣库存成功,但订单状态未更新
  • 支付单创建成功,但订单事件没推进
  • 同一订单被重复支付、重复扣减、重复发券
  • 死信消息没人处理,系统“看起来没报错,实际上静默失败”

3.5 可靠投递的本质

把上面的失败模型归纳起来,消息可靠投递的本质就是一句话:

在任意时刻发生网络异常、进程重启、节点故障、消息重复和顺序扰动时,系统都能最终把业务状态推进到正确结果,且可观测、可恢复、可追溯。

四、真实业务场景:高并发电商订单链路

为了把问题讲透,我们统一采用一个订单场景:

用户在大促期间提交订单,订单服务需要完成以下动作:

  1. 创建订单主记录
  2. 冻结库存
  3. 创建支付单
  4. 通知风控系统
  5. 推送履约系统准备发货
  6. 给营销系统发放积分或优惠券

系统特征如下:

  • 峰值下单流量 2 万 QPS
  • 订单创建链路需要快速返回,不能同步串行调用所有下游
  • 允许短时间最终一致,但不能丢单
  • 对库存、支付、优惠券等敏感资源必须防重
  • 需要支持故障重试、人工补偿和消息重放

这类场景天然适合用事件驱动架构,但又不能直接“下单后发条 MQ”了事。因为订单入库和事件发布之间一旦断裂,就会造成业务事实与事件事实不一致。

五、架构升级:从“发送消息”升级为“可靠事件驱动”

5.1 推荐架构:事务型 Outbox + 可靠消费者 + 补偿治理

对于大多数以数据库为核心状态源的业务系统,最稳妥、最通用的方案不是强依赖 MQ 的事务能力,而是:

本地事务 + Outbox 表 + 异步投递 + 幂等消费 + 死信补偿

其核心链路如下:

5.2 为什么 Outbox 是生产环境里的高性价比方案

因为它解决了最经典的“双写一致性”问题:

  • 业务写数据库
  • 同时发消息到 MQ

如果这两个动作不在一个原子边界里,就一定存在:

  • 数据成功、消息失败
  • 消息成功、数据失败

而 Outbox 模式把“业务数据”和“待发送消息”放进同一个数据库事务,先确保事实落库,再由独立投递器异步把消息发到 MQ。这种方式有几个优势:

  • 不依赖特定 MQ 的事务语义,迁移性更强
  • 容易审计和补偿,消息发送状态可追踪
  • 适合绝大多数订单、支付、库存、会员等核心业务
  • 对团队能力要求更友好,复杂度可控

5.3 什么时候用 RocketMQ 事务消息

如果团队已经深度使用 RocketMQ,且业务链路天然适合借助 Broker 回查本地事务,那么事务消息也是非常强的方案,尤其适合:

  • 跨服务链路长,对消息与事务一致性要求高
  • 不希望额外维护 Outbox 扫描器
  • 团队对 RocketMQ 运维和事务回查机制足够熟悉

但事务消息并不是“用了就万无一失”,仍然要解决:

  • 本地事务执行超时
  • 回查状态不明确
  • 消费重复
  • 下游幂等

所以要强调一点:事务消息解决的是生产侧双写问题,不会自动解决消费侧幂等和业务补偿问题。

六、三种主流方案对比:Outbox、事务消息、纯重试补偿

方案 原理 优点 缺点 适用场景
本地消息表 / Outbox 业务数据与消息记录同库同事务提交,再异步投递 MQ 最稳、可审计、与 MQ 解耦 需要维护扫描/投递器 订单、库存、支付、会员核心链路
RocketMQ 事务消息 先发半消息,再执行本地事务,Broker 回查事务状态 原生支持事务链路 依赖 MQ 能力,治理复杂度高 强依赖 RocketMQ 的中大型系统
直接发送 + 补偿重试 业务完成后立即发消息,失败靠重试与人工补偿 简单,接入成本低 双写不一致风险大 非核心通知类、可容忍少量人工修复场景

结论很明确:

  • 核心交易链路优先用 Outbox 或事务消息
  • 通知类、日志类、埋点类可简化
  • 不要把所有场景都一刀切成最复杂方案

七、主流 MQ 的可靠性能力与选型边界

7.1 RabbitMQ

RabbitMQ 的优势是低延迟、协议灵活、路由能力强,适合:

  • 业务指令型消息
  • 多路由、多交换机模型
  • 中等规模高可靠业务系统

可靠性关键点:

  • 持久化队列 + 持久化消息
  • Publisher Confirm
  • Mandatory + Return Callback
  • 手动 ACK
  • Quorum Queue 或镜像队列
  • 死信交换机和延迟重试队列

注意点:

  • 传统镜像队列已不再是新架构首选,生产环境建议优先评估 Quorum Queue
  • 大量堆积时吞吐和磁盘压力需要重点关注

7.2 RocketMQ

RocketMQ 天然偏业务消息中台,适合:

  • 交易型消息
  • 顺序消息
  • 延迟消息
  • 事务消息

可靠性关键点:

  • 同步刷盘 / 异步刷盘策略
  • 主从复制
  • 事务消息回查
  • 重试队列与死信队列
  • 消费幂等

注意点:

  • 事务消息非常强,但不是零成本能力
  • 回查逻辑要可复用、可限流、可监控

7.3 Kafka

Kafka 更适合高吞吐事件流和日志流处理,尤其适合:

  • 订单事件流
  • 行为日志
  • 风控埋点
  • 流式计算
  • CDC 数据集成

可靠性关键点:

  • acks=all
  • enable.idempotence=true
  • 合理设置副本数和 min.insync.replicas
  • 消费位点手动提交
  • 分区键设计

注意点:

  • Kafka 更擅长“事件流平台”,不一定天然适合所有强业务命令场景
  • 业务 Exactly Once 不能简单等同于 Kafka Producer 幂等

7.4 一句话选型建议

  • 业务命令、多路由、低延迟:RabbitMQ
  • 交易链路、事务消息、顺序消费:RocketMQ
  • 高吞吐事件流、日志流、大数据链路:Kafka

如果你的场景是“订单创建后驱动多个下游系统”,三者都能做,但:

  • 以业务编排和事务一致为中心,RocketMQ 优势明显
  • 以通用稳定和接入成本为中心,RabbitMQ 更常见
  • 以事件平台和流处理为中心,Kafka 更合适

八、生产级架构设计:订单系统如何做可靠投递

8.1 逻辑分层

推荐把系统拆成四层:

  1. 交易写入层
    负责接收下单请求、校验业务、写订单和 Outbox。
  2. 消息投递层
    负责从 Outbox 扫描未发送消息,投递 MQ,并更新投递状态。
  3. 事件消费层
    各领域服务消费消息,做幂等、状态推进、失败重试。
  4. 治理运维层
    提供监控、告警、死信、重放、人工补偿、审计追踪能力。

8.2 主题与事件设计

订单系统不要一上来就设计几十个 Topic。更好的方式是先围绕领域事件建模:

  • order.created
  • order.paid
  • order.cancelled
  • inventory.reserved
  • inventory.reserve_failed
  • payment.created
  • payment.succeeded

事件体建议包含:

  • eventId:事件唯一标识
  • eventType:事件类型
  • aggregateId:聚合根 ID,例如订单 ID
  • bizKey:业务键,便于排障
  • occurredAt:业务发生时间
  • traceId:链路追踪 ID
  • payload:业务内容
  • version:事件版本,支持演进

8.3 顺序性设计

并不是所有消息都需要全局有序。生产环境里应当明确:

  • 同一订单有序
  • 不同订单无序

这意味着:

  • Kafka 用 orderId 作为 key,确保同一订单落到同一分区
  • RocketMQ 用顺序消息选择器按 orderId 取模
  • RabbitMQ 用单订单串行语义时需额外设计路由和消费者模型

8.4 高并发下的关键取舍

要达到高可靠 + 高并发,必须做工程化取舍:

  • 核心链路同步步骤尽量少,只保留订单写库和 Outbox 写入
  • 非核心动作全部事件化
  • 重试机制要带退避和上限,不能无限轰炸下游
  • 幂等校验必须落库或落到强一致介质,不能只依赖进程内缓存
  • 死信不是终点,要有重放平台和处理 SOP

九、生产级实现一:基于 Outbox 的可靠投递

下面给出一套更接近生产环境的 Spring Boot + JPA 实现骨架。

9.1 表结构设计

订单表

CREATE TABLE t_order (
    id BIGSERIAL PRIMARY KEY,
    order_id VARCHAR(64) NOT NULL UNIQUE,
    user_id BIGINT NOT NULL,
    amount NUMERIC(18,2) NOT NULL,
    status VARCHAR(32) NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

Outbox 表

CREATE TABLE t_outbox_event (
    id BIGSERIAL PRIMARY KEY,
    event_id VARCHAR(64) NOT NULL UNIQUE,
    aggregate_type VARCHAR(64) NOT NULL,
    aggregate_id VARCHAR(64) NOT NULL,
    event_type VARCHAR(64) NOT NULL,
    payload JSONB NOT NULL,
    headers JSONB,
    status VARCHAR(16) NOT NULL,
    retry_count INT NOT NULL DEFAULT 0,
    next_retry_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    last_error TEXT,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_outbox_status_retry
    ON t_outbox_event(status, next_retry_time, id);

消费幂等表

CREATE TABLE t_message_consume_log (
    id BIGSERIAL PRIMARY KEY,
    consumer_group VARCHAR(128) NOT NULL,
    message_id VARCHAR(64) NOT NULL,
    biz_key VARCHAR(64),
    status VARCHAR(16) NOT NULL,
    consumed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    UNIQUE (consumer_group, message_id)
);

9.2 统一事件模型

public record DomainEvent<T>(
        String eventId,
        String eventType,
        String aggregateType,
        String aggregateId,
        String bizKey,
        Instant occurredAt,
        String traceId,
        int version,
        T payload
) {
    public static <T> DomainEvent<T> of(
            String eventType,
            String aggregateType,
            String aggregateId,
            String bizKey,
            String traceId,
            T payload
    ) {
        return new DomainEvent<>(
                UUID.randomUUID().toString(),
                eventType,
                aggregateType,
                aggregateId,
                bizKey,
                Instant.now(),
                traceId,
                1,
                payload
        );
    }
}

9.3 下单服务:业务数据与 Outbox 同事务提交

@Service
@RequiredArgsConstructor
public class OrderApplicationService {

    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final ObjectMapper objectMapper;

    @Transactional
    public String createOrder(CreateOrderCommand command) {
        String orderId = UUID.randomUUID().toString();

        Order order = Order.create(
                orderId,
                command.userId(),
                command.amount()
        );
        orderRepository.save(order);

        OrderCreatedPayload payload = new OrderCreatedPayload(
                orderId,
                command.userId(),
                command.amount(),
                command.items()
        );

        DomainEvent<OrderCreatedPayload> event = DomainEvent.of(
                "order.created",
                "Order",
                orderId,
                orderId,
                command.traceId(),
                payload
        );

        OutboxEventEntity outbox = OutboxEventEntity.pending(
                event.eventId(),
                event.aggregateType(),
                event.aggregateId(),
                event.eventType(),
                toJson(event),
                "{\"source\":\"order-service\"}"
        );
        outboxEventRepository.save(outbox);

        return orderId;
    }

    private String toJson(Object value) {
        try {
            return objectMapper.writeValueAsString(value);
        } catch (JsonProcessingException e) {
            throw new IllegalStateException("serialize event failed", e);
        }
    }
}

这一段非常关键,它保证了:

  • 订单写入成功时,待发送事件一定存在
  • 订单没写入成功时,事件也不会凭空出现

这就把“业务事实”和“消息事实”放进了同一个原子边界。

9.4 Outbox Relay:扫描、投递、确认

生产环境不建议简单用定时任务串行扫表,而应考虑:

  • 批量拉取
  • 分页/分片
  • 状态锁定
  • 重试退避
  • 多实例并发投递

下面给一个简化但工程可用的实现骨架:

@Service
@RequiredArgsConstructor
public class OutboxRelayJob {

    private final OutboxEventRepository outboxEventRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(fixedDelayString = "${outbox.relay.fixed-delay-ms:500}")
    public void relay() {
        List<OutboxEventEntity> events =
                outboxEventRepository.lockTopNForSend(200, LocalDateTime.now());

        for (OutboxEventEntity event : events) {
            try {
                SendResult<String, String> result = kafkaTemplate.send(
                        "order-domain-event",
                        event.getAggregateId(),
                        event.getPayload()
                ).get(3, TimeUnit.SECONDS);

                event.markSent(
                        result.getRecordMetadata().topic(),
                        result.getRecordMetadata().partition(),
                        result.getRecordMetadata().offset()
                );
                outboxEventRepository.save(event);
            } catch (Exception ex) {
                event.markRetry(nextRetryAt(event.getRetryCount()), rootMessage(ex));
                outboxEventRepository.save(event);
            }
        }
    }

    private LocalDateTime nextRetryAt(int retryCount) {
        long seconds = Math.min(300, (long) Math.pow(2, retryCount));
        return LocalDateTime.now().plusSeconds(seconds);
    }

    private String rootMessage(Throwable ex) {
        Throwable current = ex;
        while (current.getCause() != null) {
            current = current.getCause();
        }
        return current.getMessage();
    }
}

与演示代码相比,这里多了几个生产级要点:

  • 只扫描“待发送且到达下次重试时间”的记录
  • 每次批量拉取,避免全表扫描
  • 发送成功后记录 topic、partition、offset,便于排障
  • 失败后写回重试次数和下次执行时间,形成退避机制

9.5 仓储层:避免多实例重复扫描

如果 Outbox Relay 有多个实例,需要避免同一条消息被同时发送。常见做法包括:

  • SELECT ... FOR UPDATE SKIP LOCKED
  • 基于状态机抢占,例如 PENDING -> SENDING
  • 按主键范围分片
  • 基于一致性哈希做水平切分

例如 PostgreSQL 可用:

SELECT *
FROM t_outbox_event
WHERE status = 'PENDING'
  AND next_retry_time <= NOW()
ORDER BY id
LIMIT 200
FOR UPDATE SKIP LOCKED;

9.6 为什么这套方案在高并发下依然可用

因为下单主链路只做两次本地写操作:

  • 写订单
  • 写 Outbox

两者同库同事务,性能模型清晰,数据库层面容易优化。而真正的 MQ 发送、重试和限流都被移到了异步投递层,这样前台 RT 和成功率会明显更稳。

十、生产级实现二:可靠消费者必须是“幂等 + 手动确认 + 可重试”

消费端是很多系统真正出事故最多的地方。因为只要 ACK 语义和业务语义没对齐,就一定会出现:

  • 重复消费
  • 部分成功
  • 消息堆积
  • 失败放大

10.1 幂等不是 Redis 锁,而是业务去重

很多团队一说幂等就上 Redis setnx 锁,这只能解决“同一时刻并发进入”的一部分问题,解决不了:

  • 服务重启后的重复投递
  • ACK 失败后的重投
  • 延迟重试后的再次到达

真正稳定的方案,是基于消息唯一 ID 或业务唯一键落库去重

10.2 Kafka 消费者生产级骨架

@Slf4j
@Service
@RequiredArgsConstructor
public class InventoryOrderCreatedConsumer {

    private final ConsumeLogRepository consumeLogRepository;
    private final InventoryDomainService inventoryDomainService;
    private final ObjectMapper objectMapper;

    @KafkaListener(
            topics = "order-domain-event",
            groupId = "inventory-consumer-group",
            concurrency = "12",
            containerFactory = "kafkaManualAckListenerFactory"
    )
    @Transactional
    public void onMessage(
            ConsumerRecord<String, String> record,
            Acknowledgment acknowledgment
    ) throws Exception {
        DomainEvent<OrderCreatedPayload> event = objectMapper.readValue(
                record.value(),
                new TypeReference<>() {}
        );

        if (!"order.created".equals(event.eventType())) {
            acknowledgment.acknowledge();
            return;
        }

        boolean inserted = consumeLogRepository.insertIfAbsent(
                "inventory-consumer-group",
                event.eventId(),
                event.bizKey()
        );

        if (!inserted) {
            log.info("duplicate message ignored, eventId={}", event.eventId());
            acknowledgment.acknowledge();
            return;
        }

        try {
            inventoryDomainService.reserve(
                    event.aggregateId(),
                    event.payload().items()
            );

            consumeLogRepository.markSuccess(
                    "inventory-consumer-group",
                    event.eventId()
            );
            acknowledgment.acknowledge();
        } catch (Throwable ex) {
            consumeLogRepository.markFailed(
                    "inventory-consumer-group",
                    event.eventId(),
                    ex.getMessage()
            );
            throw ex;
        }
    }
}

这个实现体现了消费端的核心原则:

  • 先幂等登记,再执行业务
  • 业务成功后再 ACK
  • 业务异常时不 ACK,让框架重试或进入错误处理器

10.3 幂等表插入示例

@Repository
@RequiredArgsConstructor
public class ConsumeLogRepository {

    private final JdbcTemplate jdbcTemplate;

    public boolean insertIfAbsent(String consumerGroup, String messageId, String bizKey) {
        String sql = """
                INSERT INTO t_message_consume_log(consumer_group, message_id, biz_key, status)
                VALUES (?, ?, ?, 'PROCESSING')
                ON CONFLICT (consumer_group, message_id) DO NOTHING
                """;
        return jdbcTemplate.update(sql, consumerGroup, messageId, bizKey) > 0;
    }

    public void markSuccess(String consumerGroup, String messageId) {
        jdbcTemplate.update("""
                UPDATE t_message_consume_log
                   SET status = 'SUCCESS', consumed_at = CURRENT_TIMESTAMP
                 WHERE consumer_group = ? AND message_id = ?
                """, consumerGroup, messageId);
    }

    public void markFailed(String consumerGroup, String messageId, String error) {
        jdbcTemplate.update("""
                UPDATE t_message_consume_log
                   SET status = 'FAILED'
                 WHERE consumer_group = ? AND message_id = ?
                """, consumerGroup, messageId);
    }
}

10.4 库存服务中的状态机设计

库存操作不要直接写成“收到消息就减库存”,更稳妥的做法是:

  • INIT
  • RESERVED
  • CONFIRMED
  • RELEASED

也就是说,订单创建后先冻结库存,支付成功后再确认扣减,订单取消则释放冻结库存。这样即使链路抖动,也更容易做状态收敛和补偿。

十一、生产级实现三:RocketMQ 事务消息如何正确使用

如果项目选用 RocketMQ,事务消息是非常实用的能力。它的核心流程是:

  1. 生产者发送半消息(对消费者不可见)
  2. 执行本地事务
  3. 根据本地事务结果提交或回滚半消息
  4. 如果 Broker 未收到明确结果,则主动回查生产者

11.1 事务消息生产端示例

@Service
@RequiredArgsConstructor
public class OrderTransactionMessageService {

    private final RocketMQTemplate rocketMQTemplate;

    public void createOrder(CreateOrderCommand command) {
        String orderId = UUID.randomUUID().toString();
        OrderCreatedPayload payload = new OrderCreatedPayload(
                orderId,
                command.userId(),
                command.amount(),
                command.items()
        );

        Message<OrderCreatedPayload> message = MessageBuilder
                .withPayload(payload)
                .setHeader("eventType", "order.created")
                .setHeader(RocketMQHeaders.KEYS, orderId)
                .build();

        rocketMQTemplate.sendMessageInTransaction(
                "order-tx-producer-group",
                "order-created-topic",
                message,
                command
        );
    }
}

11.2 本地事务监听器

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "order-tx-producer-group")
@RequiredArgsConstructor
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    private final OrderRepository orderRepository;

    @Override
    @Transactional
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            CreateOrderCommand command = (CreateOrderCommand) arg;
            OrderCreatedPayload payload = (OrderCreatedPayload) msg.getPayload();

            Order order = Order.create(
                    payload.orderId(),
                    command.userId(),
                    command.amount()
            );
            orderRepository.save(order);

            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception ex) {
            log.error("execute local transaction failed", ex);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        OrderCreatedPayload payload = (OrderCreatedPayload) msg.getPayload();
        boolean exists = orderRepository.existsByOrderId(payload.orderId());
        return exists
                ? RocketMQLocalTransactionState.COMMIT
                : RocketMQLocalTransactionState.ROLLBACK;
    }
}

11.3 使用事务消息的几个架构注意点

  • 回查接口必须幂等且极轻量,通常只查本地事务结果
  • 不要在本地事务里做复杂远程调用
  • 事务状态一定要可审计,否则排障成本极高
  • 即使使用事务消息,消费端仍要做幂等

十二、RabbitMQ 可靠投递如何落到生产环境

RabbitMQ 适合业务命令型消息,但要想真正可靠,必须把以下配置和编码模式一起用起来。

12.1 生产端关键配置

spring:
  rabbitmq:
    host: rabbitmq.prod
    port: 5672
    username: app_user
    password: secret
    virtual-host: /trade
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 8
        max-concurrency: 32
        prefetch: 100

12.2 生产者发送示例

@Service
@RequiredArgsConstructor
public class RabbitOrderEventPublisher {

    private final RabbitTemplate rabbitTemplate;

    public void publish(String exchange, String routingKey, String payload, String eventId) {
        CorrelationData correlationData = new CorrelationData(eventId);

        rabbitTemplate.convertAndSend(
                exchange,
                routingKey,
                payload,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    message.getMessageProperties().setMessageId(eventId);
                    return message;
                },
                correlationData
        );
    }
}

12.3 Confirm 与 Return 必须分清楚

  • Confirm Callback:Broker 是否收到消息
  • Return Callback:消息是否成功路由到队列

很多系统只配 Confirm,不配 Return,结果消息到了交换机却没队列接收,最终仍然等价于丢消息。

12.4 消费端建议

  • 手动 ACK
  • 失败进入死信,不要无限原地重回队列
  • 基于延迟队列做阶梯重试,例如 1 分钟、5 分钟、30 分钟
  • 对热点业务控制 prefetch,避免单实例囤积过多未确认消息

十三、Kafka 生产级可靠性参数与消费策略

13.1 Producer 配置建议

spring:
  kafka:
    bootstrap-servers: kafka-1:9092,kafka-2:9092,kafka-3:9092
    producer:
      acks: all
      retries: 10
      batch-size: 65536
      buffer-memory: 67108864
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5
        compression.type: zstd
    consumer:
      enable-auto-commit: false
      auto-offset-reset: latest
      max-poll-records: 500
      properties:
        isolation.level: read_committed

13.2 关键解释

  • acks=all:只有所有 ISR 副本满足条件后才认为成功
  • enable.idempotence=true:减少因重试导致的生产端重复
  • enable-auto-commit=false:位点提交交给业务处理结果控制
  • read_committed:如果使用事务生产者,消费者只读已提交事务消息

13.3 Kafka 高并发下的重点不是“能发多快”,而是“分区是否够用”

Kafka 的并行上限通常取决于:

  • Topic 分区数
  • 消费组实例数
  • 单条消息处理时长
  • 下游数据库/缓存吞吐能力

如果库存服务的消费组有 12 个实例,但 Topic 只有 6 个分区,那么并发上限就是 6,不是 12。

因此容量规划时必须把“业务吞吐目标”转换成“分区数、消费者实例数、单实例处理能力”的组合设计。

十四、高并发工程化升级:不仅要可靠,还要扛流量

14.1 入口削峰:写路径做最小化

高峰期最容易崩的是同步长链路。正确做法是:

  • 请求入口快速校验
  • 核心交易数据落库
  • 事件异步化
  • 非核心动作延迟执行

也就是说,在大促下单场景中,真正同步保留在主链路里的,应该只有:

  • 风险前置校验
  • 订单写库
  • Outbox 写库

而库存冻结、支付单创建、营销通知、积分发放等都走事件链路。

14.2 消费端扩展:水平扩容不是无限有效

消费者扩容的收益受以下因素影响:

  • Topic/Queue/Partition 数量
  • 数据库连接池
  • 下游接口限流阈值
  • 单条消息处理成本

如果一个消息要串行调用 3 个下游 RPC,即使消费者线程再多,也会很快卡在网络和下游资源上。因此消费逻辑的设计原则是:

  • 单条消息尽量只做一个核心动作
  • 把复杂编排拆成多个事件阶段
  • 失败快速返回,交给重试系统

14.3 批量消费与批量落库

对于日志、埋点、风控等非强事务事件,可以批量化以提升吞吐:

  • Kafka 批量拉取后批量写入
  • RabbitMQ 开启合理 prefetch,减少网络往返
  • Outbox 扫描器批量发送
  • 幂等表批量插入

但对订单、库存、支付等交易型消息,要谨慎使用过大的批量,否则单批失败的回滚和排障成本很高。

14.4 退避重试要有限制

一个成熟的重试策略通常包含:

  • 最大重试次数
  • 指数退避
  • 抖动因子
  • 可重试错误白名单
  • 不可重试错误直接死信

例如:

  • 库存服务超时:重试
  • 参数非法:不重试
  • 商品已下架:业务拒绝,直接结束
  • 数据库主从切换瞬时失败:短暂重试

14.5 限流、熔断与隔离

如果消费端下游依赖不稳定,继续高速消费只会扩大事故面。生产环境里建议:

  • 对下游 RPC 配置限流与熔断
  • 区分核心 Topic 与普通 Topic
  • 对慢消费者单独隔离消费组
  • 对 Outbox Relay 配置发送速率上限

十五、生产级监控与治理:没有观测,就没有可靠性

一个系统哪怕设计再好,如果没有观测能力,也无法证明它可靠。

15.1 必须监控的核心指标

生产侧

  • 消息发送成功率
  • Broker ACK 延迟
  • Producer 重试次数
  • Outbox 待发送数量
  • Outbox 最老未发送消息年龄

Broker 侧

  • Topic/Queue 堆积量
  • 分区 leader 切换次数
  • ISR 变化
  • 磁盘使用率
  • 网络吞吐

消费侧

  • 消费成功率
  • 单条消息处理耗时
  • 重试次数
  • 死信数量
  • 消费延迟
  • Rebalance 次数

业务侧

  • 订单创建成功但未冻结库存数
  • 支付成功但订单状态未推进数
  • 死信积压超时数
  • 人工补偿完成率

15.2 推荐告警项

告警项 建议阈值 说明
Outbox 未发送积压 超过 5 分钟持续增长 说明投递层异常
Topic 消费延迟 超过 SLA 说明下游处理能力不足
死信速率突增 环比激增 说明代码或依赖异常
重试率 > 5% 持续 10 分钟 说明系统进入不稳定状态
消费失败率 > 1% 核心链路立即告警 交易链路必须高敏感

15.3 可观测性链路建议

每条消息都建议带上:

  • traceId
  • eventId
  • aggregateId
  • producerApp
  • consumerGroup

这样才能实现:

  • 从一次投诉订单反查整条事件链路
  • 从一条死信追溯生产端、Broker、消费端全过程
  • 从链路追踪系统定位哪个环节 RT 飙升

十六、灾备与恢复:可靠性不是“不出错”,而是“出错也能恢复”

16.1 死信队列不是垃圾桶

很多团队把 DLQ 配出来就算完成,其实这只做了 30%。真正完整的死信治理要包含:

  • 死信分类
  • 根因标签
  • 自动重放规则
  • 人工审核入口
  • 操作审计日志

16.2 建议建设“消息补偿平台”

平台至少要支持:

  • 按订单号、事件 ID 查询消息轨迹
  • 查看生产状态、消费状态、重试次数、失败原因
  • 手动重发
  • 重发前修改目标 Topic/Queue 或延迟级别
  • 批量恢复某一时间窗口的异常消息

16.3 容灾演练必须常态化

至少要演练这几类故障:

  • Broker 单节点宕机
  • 分区 leader 切换
  • 消费者批量重启
  • 数据库闪断
  • 下游库存服务不可用
  • Outbox Relay 长时间停止

演练目标不是“系统没报错”,而是确认:

  • 数据是否最终收敛
  • 告警是否及时触发
  • 补偿是否可执行
  • RTO/RPO 是否满足要求

十七、常见误区:很多系统不是败给 MQ,而是败给认知偏差

17.1 误区一:Broker 持久化了,就等于业务可靠

错。Broker 持久化只解决“消息存住了”,不解决:

  • 数据与消息双写一致性
  • 消费端重复执行
  • 业务状态不一致

17.2 误区二:用了 Redis 锁,就等于幂等

错。Redis 锁解决的是并发竞争,不等于持久化幂等语义。真正可靠的幂等必须基于业务唯一键、消息唯一键和可恢复的存储介质。

17.3 误区三:无限重试一定能成功

错。无限重试经常会把局部故障放大成系统雪崩。正确做法是:

  • 白名单重试
  • 指数退避
  • 超限死信
  • 人工补偿

17.4 误区四:所有消息都要严格顺序

错。全局顺序的代价极高,大多数业务只需要“同一业务键局部有序”。

17.5 误区五:Exactly Once 是默认目标

错。对大多数业务系统,更现实、更稳妥的目标是:

至少一次投递 + 幂等消费 + 状态收敛 + 可补偿

十八、案例落地:一条订单消息如何在生产环境闭环

下面用完整链路把前面的设计串起来。

18.1 场景

用户下单成功后,需要完成:

  • 冻结库存
  • 创建支付单
  • 发送营销通知

18.2 处理流程

  1. 订单服务收到请求,校验参数和风控规则。
  2. 在同一个数据库事务里写入 t_ordert_outbox_event
  3. Outbox Relay 扫描到 order.created 事件,投递到 order-domain-event Topic。
  4. 库存服务消费该事件,先插入消费幂等表,成功后冻结库存,再 ACK。
  5. 支付服务消费同一事件,创建支付单并 ACK。
  6. 营销服务消费后调用发券逻辑,若发券系统超时则进入延迟重试。
  7. 某个服务重试超过阈值则进入死信队列,并触发告警。
  8. 运维或业务通过补偿平台查询事件轨迹,确认根因后执行重放。

18.3 为什么这套流程能抗住高并发与故障

  • 下单主链路极短,只做本地事务
  • 所有异步步骤都支持失败重试
  • 消费重复不会重复扣库存或重复建支付单
  • 每个节点都有观测点
  • 出现问题时不是“数据黑洞”,而是“可恢复状态”

十九、文章级总结:一套可执行的设计准则

如果你要在团队里推动消息可靠投递体系建设,可以直接按下面这套准则落地:

19.1 核心设计准则

  1. 核心交易链路优先解决“双写一致性”,推荐 Outbox 或事务消息。
  2. 消费端默认按 At Least Once 设计,靠幂等保证业务正确。
  3. ACK 必须晚于业务成功,不能“先确认后处理”。
  4. 重试必须有边界,失败最终进入死信和补偿流程。
  5. 顺序性只保证到业务键级别,不追求全局顺序。
  6. 可靠性必须可观测、可审计、可重放。

19.2 技术选型准则

  • 交易业务强一致诉求高:优先 RocketMQ 事务消息或 Outbox
  • 通用业务消息、多路由能力强:RabbitMQ
  • 高吞吐事件流、大数据链路:Kafka

19.3 工程实施准则

  • 数据库表结构先设计好状态字段、重试字段、索引
  • 统一事件模型、统一 traceId、统一幂等方案
  • 建立 DLQ 和补偿平台,而不是只依赖日志排障
  • 把堆积、延迟、失败率和死信纳入 SLO 指标

二十、结语:可靠投递不是中间件特性,而是系统工程能力

真正成熟的消息可靠性体系,从来不是“我会配 RabbitMQ Confirm”或者“我会用 Kafka acks=all”这么简单。

它本质上是一个系统工程问题,涉及:

  • 分布式事务边界设计
  • 状态机建模
  • 幂等语义设计
  • 高并发吞吐优化
  • 死信补偿治理
  • 监控审计与故障演练

如果只盯着 MQ 参数,很容易得到一个“看起来可靠”的系统;如果从业务事实、失败模型和恢复能力出发,才能真正建设出一个“即使出错也能收敛”的系统。

最后给一个可以直接带走的结论:

在绝大多数互联网业务里,消息可靠投递的最优解不是追求神话般的 Exactly Once,而是用本地事务或事务消息保证生产侧一致性,用幂等和状态机保证消费侧正确性,用重试、死信和补偿保证系统最终收敛。

这,才是消息队列可靠投递从理论走向生产实践的完整答案。你可以在 云栈社区 找到更多关于分布式系统和架构设计的深度讨论与实战案例。




上一篇:Java代码审计:全链路工作流方法论与实战指南(含静态分析与LLM辅助)
下一篇:MySQL CPU 飙升500%的紧急应对与根治策略:从问题定位到架构治理
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-7 16:36 , Processed in 0.739318 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表