找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1964

积分

0

好友

280

主题
发表于 6 天前 | 查看: 18| 回复: 0

消息队列不是技术选型题,而是一道架构思维题。选错不是语法错误,是逻辑错误。

一、从“能用就行”到“必须可靠”的认知升级

1.1 第一阶段:消息队列 = 异步工具

早期认知:
“消息队列就是把同步调用改成异步,提升性能嘛。”

// 早期的“能用就行”写法
@Service
public class OrderServiceV1 {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

// 创建订单后,发个消息通知其他系统
public void createOrder(Order order){
// 保存订单到数据库
        orderRepository.save(order);

// 发消息(成功失败不管)
        kafkaTemplate.send("order-created", 
                          order.getId().toString(),
                          order.toJson());

// 继续处理其他逻辑
// ...
    }
}

问题暴露:

  1. 消息可能丢失(网络抖动、Broker重启)
  2. 消息可能重复(生产者重试)
  3. 消费者可能崩溃(消息积压)
  4. 顺序可能错乱(多分区)

1.2 第二阶段:消息队列 = 数据管道

中期认知:
“消息队列是系统间的数据通道,要保证数据不丢。”

// 中期的“保证可靠”写法
@Service
public class OrderServiceV2 {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Transactional
public void createOrder(Order order){
// 1. 保存订单到数据库(本地事务)
        orderRepository.save(order);

// 2. 发送消息(事务消息)
try {
            kafkaTemplate.executeInTransaction(t -> {
                t.send("order-created", 
                      order.getId().toString(),
                      order.toJson());
return true;
            });
        } catch (Exception e) {
// 事务回滚
throw new RuntimeException("消息发送失败", e);
        }
    }
}

仍然存在的问题:

  1. 数据库事务和消息事务是两阶段,可能不一致
  2. 消费者幂等性问题没解决
  3. 消息积压时的降级策略缺失

1.3 第三阶段:消息队列 = 架构核心

现在的认知:
“消息队列是系统架构的骨架,定义了数据流向和处理模式。”

// 现在的“架构思维”写法
@Service
@Slf4j
public class OrderServiceV3 {

@Autowired
private OrderEventPublisher eventPublisher;

@Transactional
public Long createOrder(OrderCreateCommand command){
// 1. 验证业务规则
        validateBusinessRules(command);

// 2. 创建订单聚合根
        Order order = Order.create(command);

// 3. 保存到数据库(领域事件会自动发布)
        orderRepository.save(order);

// 4. 返回订单ID(消息由基础设施层保证投递)
return order.getId();
    }
}

// 领域事件发布器(基础设施层)
@Component
@Slf4j
public class OrderEventPublisher {

@Autowired
private DomainEventStore eventStore;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreated(OrderCreatedEvent event){
// 1. 存储到事件存储(本地事务)
        eventStore.save(event);

// 2. 异步发送到消息队列
        CompletableFuture.runAsync(() -> {
try {
                sendToMessageQueue(event);
            } catch (Exception e) {
                log.error("消息发送失败,事件已持久化,可通过补偿任务重试", e);
// 记录失败,后续有补偿机制
                eventStore.markAsFailed(event.getId(), e.getMessage());
            }
        });
    }

// 定时补偿任务
@Scheduled(fixedDelay = 60000)
public void retryFailedEvents(){
        List<DomainEvent> failedEvents = eventStore.findFailedEvents();
// 重试发送...
    }
}

二、四大核心问题:每个都是架构决策点

2.1 问题一:消息丢失 vs 消息重复,你选哪个?

错误思维: “我要保证既不丢失也不重复”
现实: 分布式系统中,这是CAP的变种——你只能优先保证一个。

架构决策树:

消息可靠性要求高(支付、交易)?
├── 是 → 选择:允许重复,不允许丢失
│   ├── 解决方案:消费者幂等 + 生产者重试无限次
│   └── 代价:需要幂等设计和重复检测
│
└── 否 → 选择:允许丢失,不允许重复
    ├── 解决方案:生产者acks=1 + 消费者自动提交
    └── 代价:可能丢失少量消息

实战配置:

// 场景A:支付回调(不允许丢失)
@Configuration
public class PaymentProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");          // 所有副本确认
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证顺序
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等生产者
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-producer");

return new DefaultKafkaProducerFactory<>(props);
    }
}

// 场景B:用户行为日志(允许少量丢失)
@Configuration
public class LogProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "1");           // Leader确认即可
        props.put(ProducerConfig.RETRIES_CONFIG, 3);          // 重试3次
        props.put(ProducerConfig.LINGER_MS_CONFIG, 100);      // 批量发送
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);   // 批量大小

return new DefaultKafkaProducerFactory<>(props);
    }
}

2.2 问题二:顺序保证 vs 并发性能,你如何权衡?

错误案例: 为了性能开16个分区,结果订单状态乱了。

架构解决方案:

// 方案A:严格顺序(单分区)
public class StrictOrderProcessor {

// 关键:相同的订单ID总是路由到同一个分区
private int calculatePartition(String orderId, int partitionCount){
return Math.abs(orderId.hashCode()) % partitionCount;
    }

// 生产者发送时指定分区
public void sendOrderEvent(OrderEvent event){
int partition = calculatePartition(event.getOrderId(), 16);
        kafkaTemplate.send("order-events", partition, 
                          event.getOrderId(), event.toJson());
    }
}

// 方案B:局部顺序(业务拆分)
public class PartialOrderProcessor {

// 不同业务类型走不同Topic,各自保证顺序
public void sendEvent(DomainEvent event){
        String topic = resolveTopicByEventType(event.getType());
        kafkaTemplate.send(topic, event.getAggregateId(), event.toJson());
    }

private String resolveTopicByEventType(String eventType){
switch (eventType) {
case "ORDER_CREATED":
case "ORDER_PAID":
case "ORDER_SHIPPED":
return "order-lifecycle-events";  // 订单生命周期,需要顺序

case "ORDER_VIEWED":
case "ORDER_SHARED":  
return "order-activity-events";   // 订单活动,不需要严格顺序

default:
return "general-events";
        }
    }
}

2.3 问题三:实时处理 vs 批量处理,你设计对了吗?

错误做法: 所有消息都实时处理,结果DB被打爆。

架构思维: 根据业务特点选择处理模式。

// 实时处理模式(支付回调)
@Component
@Slf4j
public class RealTimePaymentProcessor {

@KafkaListener(topics = "payment-callback", concurrency = "3")
@Transactional
public void processPayment(PaymentCallback callback){
// 立即处理,不能延迟
        paymentService.confirmPayment(callback);
        log.info("实时处理支付回调: {}", callback.getPaymentId());
    }
}

// 批量处理模式(数据统计)
@Component
@Slf4j
public class BatchStatProcessor {

// 批量监听,每100条或每5秒处理一次
@KafkaListener(topics = "user-behavior", 
                  containerFactory = "batchContainerFactory")
@Transactional
public void processUserBehavior(List<ConsumerRecord<String, String>> records){
if (records.isEmpty()) {
return;
        }

// 批量插入数据库
        List<UserBehavior> behaviors = records.stream()
            .map(record -> convertToBehavior(record.value()))
            .collect(Collectors.toList());

// 使用JPA批量保存
        userBehaviorRepository.saveAll(behaviors);

        log.info("批量处理用户行为数据: {}条", records.size());
    }

// 批量容器配置
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
batchContainerFactory(){

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);  // 开启批量监听
        factory.getContainerProperties().setPollTimeout(5000);  // 5秒超时
        factory.getContainerProperties().setAckMode(AckMode.BATCH);

return factory;
    }
}

2.4 问题四:同步调用 vs 事件驱动,你理解本质了吗?

三年后才明白的区别:

// 传统同步架构(服务耦合)
@Service
public class SyncOrderService {

public void createOrder(Order order){
// 1. 保存订单
        orderRepository.save(order);

// 2. 同步调用库存服务(强依赖)
        stockService.deduct(order.getItems());

// 3. 同步调用积分服务(强依赖)  
        pointService.addPoints(order.getUserId(), order.getAmount());

// 4. 同步发送通知(强依赖)
        notificationService.sendSms(order.getUserPhone());

// 问题:任何一个服务挂掉,整个流程失败
// 问题:响应时间 = 所有服务耗时之和
    }
}

// 事件驱动架构(服务解耦)
@Service
@Transactional
@Slf4j  
public class EventDrivenOrderService {

@Autowired
private DomainEventPublisher eventPublisher;

public void createOrder(Order order){
// 1. 保存订单(聚合根)
        orderRepository.save(order);

// 2. 发布领域事件
// 注意:事件发布在事务提交后执行
        eventPublisher.publish(new OrderCreatedEvent(order));

// 响应时间 ≈ 数据库操作时间
// 库存、积分、通知等由各自的事件处理器异步处理
// 单个服务挂掉不影响主流程
    }
}

// 事件处理器(独立服务,可单独部署)
@Component
@Slf4j
public class OrderCreatedEventHandler {

@EventListener
@Async("orderProcessorExecutor")
public void handleOrderCreated(OrderCreatedEvent event){
try {
// 扣减库存
            stockService.deduct(event.getItems());

// 增加积分
            pointService.addPoints(event.getUserId(), event.getAmount());

// 发送通知
            notificationService.sendSms(event.getUserPhone());

        } catch (Exception e) {
// 单个处理器失败不影响其他处理器
            log.error("处理订单创建事件失败", e);
// 进入死信队列,有补偿机制
        }
    }
}

三、消息队列的四种架构模式

3.1 模式一:事件通知(最常用)

场景: 状态变化通知

// 订单状态变化时通知相关方
public class OrderStatusChangedEvent {
private String orderId;
private String oldStatus;
private String newStatus;
private Date changedTime;
}

// 多个消费者独立处理
// 1. 库存系统:监听发货通知,释放库存占用
// 2. 客服系统:监听异常订单,自动创建工单
// 3. 财务系统:监听完成订单,生成结算单

3.2 模式二:事件溯源(最强大)

场景: 需要完整审计追踪

// 存储所有状态变化事件
@Entity
public class OrderAggregate {

@Id
private String orderId;

// 当前状态(可从事件重建)
private String currentStatus;

// 所有事件列表
@OneToMany(cascade = CascadeType.ALL)
private List<OrderEvent> events = new ArrayList<>();

// 应用事件,更新状态
public void applyEvent(OrderEvent event){
this.events.add(event);
this.currentStatus = event.getNewStatus();
    }

// 从事件重建聚合(用于数据修复)
public static OrderAggregate rebuildFromEvents(List<OrderEvent> events){
        OrderAggregate order = new OrderAggregate();
        events.forEach(order::applyEvent);
return order;
    }
}

3.3 模式三:消息总线(最解耦)

场景: 微服务间通信

// 统一事件格式
public abstract class DomainEvent {
private String eventId;
private String aggregateType;  // Order, User, Product
private String aggregateId;
private String eventType;
private Date occurredAt;
private Map<String, Object> payload;
}

// 统一事件处理器接口
public interface EventHandler<T extends DomainEvent> {
boolean canHandle(String eventType);
void handle(T event);
}

// 事件分发器
@Component
public class EventDispatcher {

private final Map<String, List<EventHandler>> handlers = new HashMap<>();

@KafkaListener(topics = "#{'${event.topics}'.split(',')}")
public void dispatch(String message){
        DomainEvent event = deserialize(message);

        List<EventHandler> eventHandlers = handlers.get(event.getEventType());
if (eventHandlers != null) {
            eventHandlers.forEach(handler -> {
try {
                    handler.handle(event);
                } catch (Exception e) {
                    log.error("事件处理失败", e);
// 不影响其他处理器
                }
            });
        }
    }
}

3.4 模式四:流处理(最实时)

场景: 实时数据分析

// 使用Kafka Streams实时统计
public class OrderStatisticsStream {

@Bean
public KStream<String, OrderEvent> orderStream(StreamsBuilder builder){
        KStream<String, OrderEvent> stream = builder
            .stream("order-events", Consumed.with(Serdes.String(), orderEventSerde()));

// 实时统计每分钟订单数
        stream
            .groupBy((key, value) -> {
// 按分钟分组
                Instant instant = value.getCreatedAt().toInstant();
return instant.atZone(ZoneId.systemDefault())
                    .truncatedTo(ChronoUnit.MINUTES)
                    .toString();
            })
            .count(Materialized.as("orders-per-minute"))
            .toStream()
            .to("order-stats-minute", Produced.with(Serdes.String(), Serdes.Long()));

// 实时统计每个用户的订单金额
        stream
            .filter((key, value) -> "ORDER_PAID".equals(value.getEventType()))
            .groupBy((key, value) -> value.getUserId())
            .aggregate(
                () -> 0.0,
                (userId, event, total) -> total + event.getAmount(),
                Materialized.as("user-order-total")
            )
            .toStream()
            .to("user-order-totals", Produced.with(Serdes.String(), Serdes.Double()));

return stream;
    }
}

四、实战:从零设计一个可靠的消息系统

4.1 第一步:明确业务需求(不要从技术出发!)

需求分析模板:

业务场景:订单支付回调
可靠性要求:极高(不允许丢失或重复扣款)
顺序要求:中等(同一订单的支付、退款需要顺序)
延迟要求:高(5秒内必须处理)
吞吐量:1000 TPS
数据大小:平均2KB

业务场景:用户行为日志
可靠性要求:低(允许丢失5%)
顺序要求:低(不需要)
延迟要求:低(5分钟内处理即可)
吞吐量:10000 TPS
数据大小:平均500B

4.2 第二步:选择合适的技术栈

技术选型决策矩阵:

public class MessageQueueSelector {

public MessageQueue select(Requirements req){
if (req.getReliability() == Reliability.HIGH) {
if (req.getThroughput() > 10000) {
return MessageQueue.KAFKA;  // 高可靠+高吞吐
            } else {
return MessageQueue.ROCKETMQ;  // 高可靠+事务消息
            }
        } else if (req.getOrdering() == Ordering.STRICT) {
return MessageQueue.PULSAR;  // 严格顺序保证
        } else if (req.getLatency() == Latency.MILLISECOND) {
return MessageQueue.RABBITMQ;  // 低延迟
        } else {
return MessageQueue.KAFKA;  // 默认选择
        }
    }
}

// 实际项目中的多队列架构
@Configuration
public class MultiQueueConfig {

// 支付回调:RocketMQ(事务消息)
@Bean
public RocketMQTemplate paymentTemplate(){
// 配置事务消息支持
    }

// 订单事件:Kafka(高吞吐)
@Bean
public KafkaTemplate<String, String> orderTemplate(){
// 配置高吞吐
    }

// 实时通知:RabbitMQ(低延迟)
@Bean
public RabbitTemplate notificationTemplate(){
// 配置低延迟
    }
}

4.3 第三步:设计生产者架构

生产者的四个核心责任:

@Component
@Slf4j
public class ReliableProducer {

// 责任1:消息本地持久化(防丢失)
@Autowired
private MessageStore messageStore;

// 责任2:异步发送(不阻塞业务)
@Autowired
private ExecutorService asyncExecutor;

// 责任3:失败重试(保证送达)
@Autowired
private RetryPolicy retryPolicy;

// 责任4:监控埋点(可观测性)
@Autowired
private MetricsRecorder metricsRecorder;

public CompletableFuture<SendResult> sendReliably(String topic, 
                                                         String key, 
                                                         String message){
// 1. 生成唯一ID(用于去重)
        String messageId = generateMessageId();

// 2. 本地持久化(WAL机制)
        MessageRecord record = new MessageRecord(messageId, topic, key, message);
        messageStore.save(record);

// 3. 异步发送
return CompletableFuture.supplyAsync(() -> {
            SendResult result = null;
            Exception lastException = null;

// 按照重试策略发送
for (int attempt = 1; attempt <= retryPolicy.getMaxAttempts(); attempt++) {
try {
                    result = doSend(topic, key, message);

// 发送成功,更新状态
                    messageStore.markAsSent(messageId);
                    metricsRecorder.recordSuccess(topic);

return result;

                } catch (Exception e) {
                    lastException = e;
                    log.warn("消息发送失败,第{}次重试", attempt, e);

// 记录失败
                    messageStore.markAsFailed(messageId, e.getMessage());
                    metricsRecorder.recordFailure(topic);

// 等待重试间隔
if (attempt < retryPolicy.getMaxAttempts()) {
                        sleep(retryPolicy.getBackoff(attempt));
                    }
                }
            }

// 所有重试都失败
            messageStore.markAsDead(messageId);
throw new MessageSendException("消息发送失败", lastException);

        }, asyncExecutor);
    }

// 定时任务:扫描失败消息并重试
@Scheduled(fixedDelay = 60000)
public void retryFailedMessages(){
        List<MessageRecord> failedMessages = messageStore.findFailedMessages();

        failedMessages.forEach(record -> {
            sendReliably(record.getTopic(), 
                        record.getKey(), 
                        record.getMessage());
        });
    }
}

4.4 第四步:设计消费者架构

消费者的四个核心责任:

@Component
@Slf4j
public class ReliableConsumer {

// 责任1:幂等处理(防重复)
@Autowired
private IdempotenceChecker idempotenceChecker;

// 责任2:死信处理(防阻塞)
@Autowired
private DeadLetterHandler deadLetterHandler;

// 责任3:流量控制(防打垮)
@Autowired
private RateLimiter rateLimiter;

// 责任4:手动提交(防丢失)
private final Acknowledgment ack;

@KafkaListener(topics = "order-events")
public void consume(ConsumerRecord<String, String> record){
        String messageId = extractMessageId(record);

// 1. 幂等检查(已处理过的消息直接跳过)
if (idempotenceChecker.isProcessed(messageId)) {
            log.info("消息已处理过,跳过: {}", messageId);
            ack.acknowledge();  // 仍然提交offset
return;
        }

try {
// 2. 流量控制(防止突发流量打垮系统)
if (!rateLimiter.tryAcquire()) {
                log.warn("达到流量限制,暂停消费");
                Thread.sleep(1000);
// 不提交offset,下次继续消费这条消息
return;
            }

// 3. 业务处理
            processBusinessLogic(record.value());

// 4. 记录已处理(幂等标记)
            idempotenceChecker.markAsProcessed(messageId);

// 5. 手动提交offset(处理成功才提交)
            ack.acknowledge();

            log.info("消息处理成功: {}", messageId);

        } catch (BusinessException e) {
// 业务异常(如余额不足),进入死信队列
            log.error("业务处理失败,进入死信队列", e);
            deadLetterHandler.sendToDlq(record, e.getMessage());
            ack.acknowledge();  // 提交offset,不再重试

        } catch (SystemException e) {
// 系统异常(如数据库连接失败),需要重试
            log.error("系统处理失败,等待重试", e);
// 不提交offset,让消息重新消费
throw e;

        } catch (Exception e) {
// 未知异常
            log.error("未知异常", e);
            deadLetterHandler.sendToDlq(record, "UNKNOWN_ERROR");
            ack.acknowledge();
        }
    }

// 批量消费版本
@KafkaListener(topics = "user-behavior", 
                  containerFactory = "batchContainerFactory")
public void consumeBatch(List<ConsumerRecord<String, String>> records){
// 批量处理逻辑
        List<CompletableFuture<Void>> futures = records.stream()
            .map(record -> CompletableFuture.runAsync(() -> {
try {
                    consume(record);
                } catch (Exception e) {
                    log.error("批量处理中单条消息失败", e);
                }
            }))
            .collect(Collectors.toList());

// 等待所有消息处理完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenRun(() -> {
// 批量提交offset
                ack.acknowledge();
            })
            .exceptionally(e -> {
                log.error("批量处理失败", e);
return null;
            });
    }
}

4.5 第五步:设计监控与运维体系

监控指标清单:

# 生产者监控
producer_metrics:
- send_success_rate:发送成功率
- send_latency_p99:发送延迟P99
- retry_count:重试次数
- buffer_usage:缓冲区使用率

# 消费者监控  
consumer_metrics:
- consume_lag:消费延迟(消息积压)
- process_success_rate:处理成功率
- process_latency_p99:处理延迟P99
- dead_letter_count:死信数量

# 系统监控
system_metrics:
- broker_health:Broker健康状态
- disk_usage:磁盘使用率
- network_io:网络IO
- connection_count:连接数

告警规则配置:

@Component
public class MessageQueueAlerts {

// 规则1:消息积压告警
@Scheduled(fixedRate = 60000)
public void checkConsumerLag(){
        Map<TopicPartition, Long> lags = getConsumerLag();

        lags.forEach((tp, lag) -> {
if (lag > 10000) {  // 积压超过1万条
                alertService.sendAlert(
"CONSUMER_LAG_HIGH",
                    Map.of(
"topic", tp.topic(),
"partition", tp.partition(),
"lag", lag.toString()
                    )
                );

// 自动扩容消费者
if (canScaleOut(tp.topic())) {
                    scaleOutConsumer(tp.topic());
                }
            }
        });
    }

// 规则2:发送失败告警
@EventListener
public void handleSendFailure(SendFailureEvent event){
if (event.getFailureCount() > 10) {
            alertService.sendAlert(
"PRODUCER_SEND_FAILURE",
                Map.of(
"topic", event.getTopic(),
"failureCount", String.valueOf(event.getFailureCount()),
"lastError", event.getLastError()
                )
            );
        }
    }

// 规则3:死信过多告警
@Scheduled(fixedRate = 300000)  // 5分钟
public void checkDeadLetter(){
long deadLetterCount = deadLetterService.getCountLastHour();

if (deadLetterCount > 100) {
            alertService.sendAlert(
"DEAD_LETTER_HIGH",
                Map.of("count", String.valueOf(deadLetterCount))
            );
        }
    }
}

五、血泪教训:这三年我总结的10条军规

5.1 设计阶段

  1. 先定义消息契约,再写代码
// ❌ 错误:直接发JSON字符串
kafkaTemplate.send("order-events", orderId, order.toString());

// ✅ 正确:定义事件契约
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderCreatedEvent implements DomainEvent{
private String eventId;
private String orderId;
private String userId;
private BigDecimal amount;
private Date createdAt;
private String eventType = "ORDER_CREATED";
private int version = 1;  // 版本号,用于兼容性
}
  1. 一个Topic只做一件事
// ❌ 错误:订单所有事件都发到一个Topic
// order-events: 包含创建、支付、发货、退款...

// ✅ 正确:按业务语义拆分Topic
// order-lifecycle-events: 创建、支付、发货(需要顺序)
// order-activity-events: 浏览、分享、收藏(不需要顺序)
// order-compensation-events: 退款、售后(补偿操作)

5.2 开发阶段

  1. 生产者必须考虑失败场景
// ❌ 错误:发送后不管
kafkaTemplate.send(topic, message);

// ✅ 正确:处理发送结果
ListenableFuture<SendResult<String, String>> future = 
    kafkaTemplate.send(topic, message);

future.addCallback(
    result -> log.info("发送成功: {}", result),
    ex -> {
        log.error("发送失败", ex);
// 1. 记录到本地存储
// 2. 触发告警
// 3. 启动补偿任务
    }
);
  1. 消费者必须实现幂等
// 幂等方案1:数据库唯一约束
@Entity
@Table(name = "processed_messages", 
       uniqueConstraints = @UniqueConstraint(columnNames = "message_id"))
public class ProcessedMessage {
@Id
private String messageId;
private Date processedAt;
}

// 幂等方案2:Redis SETNX
public boolean isProcessed(String messageId){
    String key = "processed:" + messageId;
// SET key value NX EX 86400  # 24小时过期
return !redisTemplate.opsForValue()
        .setIfAbsent(key, "1", Duration.ofHours(24));
}

5.3 运维阶段

  1. 监控必须到位才能睡觉
# 必须监控的四个黄金指标
# 1. 延迟(Latency): 消息从生产到消费的时间
# 2. 流量(Traffic): 生产/消费的QPS
# 3. 错误(Errors): 失败率、重试率
# 4. 饱和度(Saturation): 消息积压、磁盘使用率
  1. 必须有降级和熔断
@Component
public class MessageQueueCircuitBreaker {

private final CircuitBreaker circuitBreaker = CircuitBreaker.of(
"message-queue",
        CircuitBreakerConfig.custom()
            .failureRateThreshold(50)  // 失败率50%触发熔断
            .waitDurationInOpenState(Duration.ofSeconds(30))
            .build()
    );

@CircuitBreaker(name = "message-queue", fallbackMethod = "fallbackSend")
public SendResult sendWithCircuitBreaker(String topic, String message){
return kafkaTemplate.send(topic, message).get();
    }

public SendResult fallbackSend(String topic, String message, Exception e){
// 1. 降级到本地存储
        localStorage.save(topic, message);
// 2. 记录日志
        log.warn("消息队列熔断,降级到本地存储", e);
// 3. 返回降级结果
return new DegradedSendResult();
    }
}

5.4 架构层面

  1. 消息队列的优缺点

    • 引入消息队列 → 系统复杂度增加
    • 异步处理 → 问题排查难度增加
    • 最终一致性 → 业务逻辑复杂度增加
  2. 根据团队能力选择技术栈

// 小团队(3-5人):RabbitMQ + 简单队列
// 优点:运维简单,社区成熟,文档丰富
// 缺点:吞吐量有限,功能相对简单

// 中等团队(10-20人):Kafka + 基础监控
// 优点:高吞吐,生态丰富
// 缺点:运维复杂,需要专人维护

// 大团队(50+人):自研消息中间件 + 完整监控体系
// 优点:完全可控,定制化强
// 缺点:研发成本高,需要专业团队

六、最后的话:消息队列是架构思维的试金石

在工作中我渐渐明白消息队列是架构思维的集中体现

它考验你的:

  • 抽象能力:能否从业务中抽象出合适的事件
  • 权衡能力:在可靠性和性能之间找到平衡点
  • 设计能力:设计出可扩展、可维护的消息流
  • 工程能力:构建稳定可靠的生产级系统

消息队列用得好不好,不在于你用了多少高级功能,而在于:

  1. 业务是否真正需要它(而不是为了用而用)
  2. 设计是否简洁清晰(而不是过度设计)
  3. 故障是否可控可恢复(而不是一出问题就全崩)
  4. 团队是否能够驾驭(而不是引入后就没人会维护)

希望本文分享的从工具认知到架构思维的演进,以及对Kafka等消息队列的实战剖析,能帮助你在系统设计中做出更明智的决策。技术的选择与落地,最终都是为了更好地服务于业务目标。


本文探讨了消息队列在系统架构中的核心地位与实战经验,更多关于微服务Java及高并发系统的深度讨论,欢迎访问云栈社区




上一篇:Spring Cloud Gateway高并发架构实战:异步非阻塞与分布式限流
下一篇:Ubuntu 22.04基于Docker部署Hadoop 3.4.1分布式集群实战教程
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 08:51 , Processed in 0.199413 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表