消息队列不是技术选型题,而是一道架构思维题。选错不是语法错误,是逻辑错误。
一、从“能用就行”到“必须可靠”的认知升级
1.1 第一阶段:消息队列 = 异步工具
早期认知:
“消息队列就是把同步调用改成异步,提升性能嘛。”
// 早期的“能用就行”写法
@Service
public class OrderServiceV1 {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 创建订单后,发个消息通知其他系统
public void createOrder(Order order){
// 保存订单到数据库
orderRepository.save(order);
// 发消息(成功失败不管)
kafkaTemplate.send("order-created",
order.getId().toString(),
order.toJson());
// 继续处理其他逻辑
// ...
}
}
问题暴露:
- 消息可能丢失(网络抖动、Broker重启)
- 消息可能重复(生产者重试)
- 消费者可能崩溃(消息积压)
- 顺序可能错乱(多分区)
1.2 第二阶段:消息队列 = 数据管道
中期认知:
“消息队列是系统间的数据通道,要保证数据不丢。”
// 中期的“保证可靠”写法
@Service
public class OrderServiceV2 {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void createOrder(Order order){
// 1. 保存订单到数据库(本地事务)
orderRepository.save(order);
// 2. 发送消息(事务消息)
try {
kafkaTemplate.executeInTransaction(t -> {
t.send("order-created",
order.getId().toString(),
order.toJson());
return true;
});
} catch (Exception e) {
// 事务回滚
throw new RuntimeException("消息发送失败", e);
}
}
}
仍然存在的问题:
- 数据库事务和消息事务是两阶段,可能不一致
- 消费者幂等性问题没解决
- 消息积压时的降级策略缺失
1.3 第三阶段:消息队列 = 架构核心
现在的认知:
“消息队列是系统架构的骨架,定义了数据流向和处理模式。”
// 现在的“架构思维”写法
@Service
@Slf4j
public class OrderServiceV3 {
@Autowired
private OrderEventPublisher eventPublisher;
@Transactional
public Long createOrder(OrderCreateCommand command){
// 1. 验证业务规则
validateBusinessRules(command);
// 2. 创建订单聚合根
Order order = Order.create(command);
// 3. 保存到数据库(领域事件会自动发布)
orderRepository.save(order);
// 4. 返回订单ID(消息由基础设施层保证投递)
return order.getId();
}
}
// 领域事件发布器(基础设施层)
@Component
@Slf4j
public class OrderEventPublisher {
@Autowired
private DomainEventStore eventStore;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleOrderCreated(OrderCreatedEvent event){
// 1. 存储到事件存储(本地事务)
eventStore.save(event);
// 2. 异步发送到消息队列
CompletableFuture.runAsync(() -> {
try {
sendToMessageQueue(event);
} catch (Exception e) {
log.error("消息发送失败,事件已持久化,可通过补偿任务重试", e);
// 记录失败,后续有补偿机制
eventStore.markAsFailed(event.getId(), e.getMessage());
}
});
}
// 定时补偿任务
@Scheduled(fixedDelay = 60000)
public void retryFailedEvents(){
List<DomainEvent> failedEvents = eventStore.findFailedEvents();
// 重试发送...
}
}
二、四大核心问题:每个都是架构决策点
2.1 问题一:消息丢失 vs 消息重复,你选哪个?
错误思维: “我要保证既不丢失也不重复”
现实: 分布式系统中,这是CAP的变种——你只能优先保证一个。
架构决策树:
消息可靠性要求高(支付、交易)?
├── 是 → 选择:允许重复,不允许丢失
│ ├── 解决方案:消费者幂等 + 生产者重试无限次
│ └── 代价:需要幂等设计和重复检测
│
└── 否 → 选择:允许丢失,不允许重复
├── 解决方案:生产者acks=1 + 消费者自动提交
└── 代价:可能丢失少量消息
实战配置:
// 场景A:支付回调(不允许丢失)
@Configuration
public class PaymentProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 保证顺序
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等生产者
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-producer");
return new DefaultKafkaProducerFactory<>(props);
}
}
// 场景B:用户行为日志(允许少量丢失)
@Configuration
public class LogProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.ACKS_CONFIG, "1"); // Leader确认即可
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试3次
props.put(ProducerConfig.LINGER_MS_CONFIG, 100); // 批量发送
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小
return new DefaultKafkaProducerFactory<>(props);
}
}
2.2 问题二:顺序保证 vs 并发性能,你如何权衡?
错误案例: 为了性能开16个分区,结果订单状态乱了。
架构解决方案:
// 方案A:严格顺序(单分区)
public class StrictOrderProcessor {
// 关键:相同的订单ID总是路由到同一个分区
private int calculatePartition(String orderId, int partitionCount){
return Math.abs(orderId.hashCode()) % partitionCount;
}
// 生产者发送时指定分区
public void sendOrderEvent(OrderEvent event){
int partition = calculatePartition(event.getOrderId(), 16);
kafkaTemplate.send("order-events", partition,
event.getOrderId(), event.toJson());
}
}
// 方案B:局部顺序(业务拆分)
public class PartialOrderProcessor {
// 不同业务类型走不同Topic,各自保证顺序
public void sendEvent(DomainEvent event){
String topic = resolveTopicByEventType(event.getType());
kafkaTemplate.send(topic, event.getAggregateId(), event.toJson());
}
private String resolveTopicByEventType(String eventType){
switch (eventType) {
case "ORDER_CREATED":
case "ORDER_PAID":
case "ORDER_SHIPPED":
return "order-lifecycle-events"; // 订单生命周期,需要顺序
case "ORDER_VIEWED":
case "ORDER_SHARED":
return "order-activity-events"; // 订单活动,不需要严格顺序
default:
return "general-events";
}
}
}
2.3 问题三:实时处理 vs 批量处理,你设计对了吗?
错误做法: 所有消息都实时处理,结果DB被打爆。
架构思维: 根据业务特点选择处理模式。
// 实时处理模式(支付回调)
@Component
@Slf4j
public class RealTimePaymentProcessor {
@KafkaListener(topics = "payment-callback", concurrency = "3")
@Transactional
public void processPayment(PaymentCallback callback){
// 立即处理,不能延迟
paymentService.confirmPayment(callback);
log.info("实时处理支付回调: {}", callback.getPaymentId());
}
}
// 批量处理模式(数据统计)
@Component
@Slf4j
public class BatchStatProcessor {
// 批量监听,每100条或每5秒处理一次
@KafkaListener(topics = "user-behavior",
containerFactory = "batchContainerFactory")
@Transactional
public void processUserBehavior(List<ConsumerRecord<String, String>> records){
if (records.isEmpty()) {
return;
}
// 批量插入数据库
List<UserBehavior> behaviors = records.stream()
.map(record -> convertToBehavior(record.value()))
.collect(Collectors.toList());
// 使用JPA批量保存
userBehaviorRepository.saveAll(behaviors);
log.info("批量处理用户行为数据: {}条", records.size());
}
// 批量容器配置
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
batchContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // 开启批量监听
factory.getContainerProperties().setPollTimeout(5000); // 5秒超时
factory.getContainerProperties().setAckMode(AckMode.BATCH);
return factory;
}
}
2.4 问题四:同步调用 vs 事件驱动,你理解本质了吗?
三年后才明白的区别:
// 传统同步架构(服务耦合)
@Service
public class SyncOrderService {
public void createOrder(Order order){
// 1. 保存订单
orderRepository.save(order);
// 2. 同步调用库存服务(强依赖)
stockService.deduct(order.getItems());
// 3. 同步调用积分服务(强依赖)
pointService.addPoints(order.getUserId(), order.getAmount());
// 4. 同步发送通知(强依赖)
notificationService.sendSms(order.getUserPhone());
// 问题:任何一个服务挂掉,整个流程失败
// 问题:响应时间 = 所有服务耗时之和
}
}
// 事件驱动架构(服务解耦)
@Service
@Transactional
@Slf4j
public class EventDrivenOrderService {
@Autowired
private DomainEventPublisher eventPublisher;
public void createOrder(Order order){
// 1. 保存订单(聚合根)
orderRepository.save(order);
// 2. 发布领域事件
// 注意:事件发布在事务提交后执行
eventPublisher.publish(new OrderCreatedEvent(order));
// 响应时间 ≈ 数据库操作时间
// 库存、积分、通知等由各自的事件处理器异步处理
// 单个服务挂掉不影响主流程
}
}
// 事件处理器(独立服务,可单独部署)
@Component
@Slf4j
public class OrderCreatedEventHandler {
@EventListener
@Async("orderProcessorExecutor")
public void handleOrderCreated(OrderCreatedEvent event){
try {
// 扣减库存
stockService.deduct(event.getItems());
// 增加积分
pointService.addPoints(event.getUserId(), event.getAmount());
// 发送通知
notificationService.sendSms(event.getUserPhone());
} catch (Exception e) {
// 单个处理器失败不影响其他处理器
log.error("处理订单创建事件失败", e);
// 进入死信队列,有补偿机制
}
}
}
三、消息队列的四种架构模式
3.1 模式一:事件通知(最常用)
场景: 状态变化通知
// 订单状态变化时通知相关方
public class OrderStatusChangedEvent {
private String orderId;
private String oldStatus;
private String newStatus;
private Date changedTime;
}
// 多个消费者独立处理
// 1. 库存系统:监听发货通知,释放库存占用
// 2. 客服系统:监听异常订单,自动创建工单
// 3. 财务系统:监听完成订单,生成结算单
3.2 模式二:事件溯源(最强大)
场景: 需要完整审计追踪
// 存储所有状态变化事件
@Entity
public class OrderAggregate {
@Id
private String orderId;
// 当前状态(可从事件重建)
private String currentStatus;
// 所有事件列表
@OneToMany(cascade = CascadeType.ALL)
private List<OrderEvent> events = new ArrayList<>();
// 应用事件,更新状态
public void applyEvent(OrderEvent event){
this.events.add(event);
this.currentStatus = event.getNewStatus();
}
// 从事件重建聚合(用于数据修复)
public static OrderAggregate rebuildFromEvents(List<OrderEvent> events){
OrderAggregate order = new OrderAggregate();
events.forEach(order::applyEvent);
return order;
}
}
3.3 模式三:消息总线(最解耦)
场景: 微服务间通信
// 统一事件格式
public abstract class DomainEvent {
private String eventId;
private String aggregateType; // Order, User, Product
private String aggregateId;
private String eventType;
private Date occurredAt;
private Map<String, Object> payload;
}
// 统一事件处理器接口
public interface EventHandler<T extends DomainEvent> {
boolean canHandle(String eventType);
void handle(T event);
}
// 事件分发器
@Component
public class EventDispatcher {
private final Map<String, List<EventHandler>> handlers = new HashMap<>();
@KafkaListener(topics = "#{'${event.topics}'.split(',')}")
public void dispatch(String message){
DomainEvent event = deserialize(message);
List<EventHandler> eventHandlers = handlers.get(event.getEventType());
if (eventHandlers != null) {
eventHandlers.forEach(handler -> {
try {
handler.handle(event);
} catch (Exception e) {
log.error("事件处理失败", e);
// 不影响其他处理器
}
});
}
}
}
3.4 模式四:流处理(最实时)
场景: 实时数据分析
// 使用Kafka Streams实时统计
public class OrderStatisticsStream {
@Bean
public KStream<String, OrderEvent> orderStream(StreamsBuilder builder){
KStream<String, OrderEvent> stream = builder
.stream("order-events", Consumed.with(Serdes.String(), orderEventSerde()));
// 实时统计每分钟订单数
stream
.groupBy((key, value) -> {
// 按分钟分组
Instant instant = value.getCreatedAt().toInstant();
return instant.atZone(ZoneId.systemDefault())
.truncatedTo(ChronoUnit.MINUTES)
.toString();
})
.count(Materialized.as("orders-per-minute"))
.toStream()
.to("order-stats-minute", Produced.with(Serdes.String(), Serdes.Long()));
// 实时统计每个用户的订单金额
stream
.filter((key, value) -> "ORDER_PAID".equals(value.getEventType()))
.groupBy((key, value) -> value.getUserId())
.aggregate(
() -> 0.0,
(userId, event, total) -> total + event.getAmount(),
Materialized.as("user-order-total")
)
.toStream()
.to("user-order-totals", Produced.with(Serdes.String(), Serdes.Double()));
return stream;
}
}
四、实战:从零设计一个可靠的消息系统
4.1 第一步:明确业务需求(不要从技术出发!)
需求分析模板:
业务场景:订单支付回调
可靠性要求:极高(不允许丢失或重复扣款)
顺序要求:中等(同一订单的支付、退款需要顺序)
延迟要求:高(5秒内必须处理)
吞吐量:1000 TPS
数据大小:平均2KB
业务场景:用户行为日志
可靠性要求:低(允许丢失5%)
顺序要求:低(不需要)
延迟要求:低(5分钟内处理即可)
吞吐量:10000 TPS
数据大小:平均500B
4.2 第二步:选择合适的技术栈
技术选型决策矩阵:
public class MessageQueueSelector {
public MessageQueue select(Requirements req){
if (req.getReliability() == Reliability.HIGH) {
if (req.getThroughput() > 10000) {
return MessageQueue.KAFKA; // 高可靠+高吞吐
} else {
return MessageQueue.ROCKETMQ; // 高可靠+事务消息
}
} else if (req.getOrdering() == Ordering.STRICT) {
return MessageQueue.PULSAR; // 严格顺序保证
} else if (req.getLatency() == Latency.MILLISECOND) {
return MessageQueue.RABBITMQ; // 低延迟
} else {
return MessageQueue.KAFKA; // 默认选择
}
}
}
// 实际项目中的多队列架构
@Configuration
public class MultiQueueConfig {
// 支付回调:RocketMQ(事务消息)
@Bean
public RocketMQTemplate paymentTemplate(){
// 配置事务消息支持
}
// 订单事件:Kafka(高吞吐)
@Bean
public KafkaTemplate<String, String> orderTemplate(){
// 配置高吞吐
}
// 实时通知:RabbitMQ(低延迟)
@Bean
public RabbitTemplate notificationTemplate(){
// 配置低延迟
}
}
4.3 第三步:设计生产者架构
生产者的四个核心责任:
@Component
@Slf4j
public class ReliableProducer {
// 责任1:消息本地持久化(防丢失)
@Autowired
private MessageStore messageStore;
// 责任2:异步发送(不阻塞业务)
@Autowired
private ExecutorService asyncExecutor;
// 责任3:失败重试(保证送达)
@Autowired
private RetryPolicy retryPolicy;
// 责任4:监控埋点(可观测性)
@Autowired
private MetricsRecorder metricsRecorder;
public CompletableFuture<SendResult> sendReliably(String topic,
String key,
String message){
// 1. 生成唯一ID(用于去重)
String messageId = generateMessageId();
// 2. 本地持久化(WAL机制)
MessageRecord record = new MessageRecord(messageId, topic, key, message);
messageStore.save(record);
// 3. 异步发送
return CompletableFuture.supplyAsync(() -> {
SendResult result = null;
Exception lastException = null;
// 按照重试策略发送
for (int attempt = 1; attempt <= retryPolicy.getMaxAttempts(); attempt++) {
try {
result = doSend(topic, key, message);
// 发送成功,更新状态
messageStore.markAsSent(messageId);
metricsRecorder.recordSuccess(topic);
return result;
} catch (Exception e) {
lastException = e;
log.warn("消息发送失败,第{}次重试", attempt, e);
// 记录失败
messageStore.markAsFailed(messageId, e.getMessage());
metricsRecorder.recordFailure(topic);
// 等待重试间隔
if (attempt < retryPolicy.getMaxAttempts()) {
sleep(retryPolicy.getBackoff(attempt));
}
}
}
// 所有重试都失败
messageStore.markAsDead(messageId);
throw new MessageSendException("消息发送失败", lastException);
}, asyncExecutor);
}
// 定时任务:扫描失败消息并重试
@Scheduled(fixedDelay = 60000)
public void retryFailedMessages(){
List<MessageRecord> failedMessages = messageStore.findFailedMessages();
failedMessages.forEach(record -> {
sendReliably(record.getTopic(),
record.getKey(),
record.getMessage());
});
}
}
4.4 第四步:设计消费者架构
消费者的四个核心责任:
@Component
@Slf4j
public class ReliableConsumer {
// 责任1:幂等处理(防重复)
@Autowired
private IdempotenceChecker idempotenceChecker;
// 责任2:死信处理(防阻塞)
@Autowired
private DeadLetterHandler deadLetterHandler;
// 责任3:流量控制(防打垮)
@Autowired
private RateLimiter rateLimiter;
// 责任4:手动提交(防丢失)
private final Acknowledgment ack;
@KafkaListener(topics = "order-events")
public void consume(ConsumerRecord<String, String> record){
String messageId = extractMessageId(record);
// 1. 幂等检查(已处理过的消息直接跳过)
if (idempotenceChecker.isProcessed(messageId)) {
log.info("消息已处理过,跳过: {}", messageId);
ack.acknowledge(); // 仍然提交offset
return;
}
try {
// 2. 流量控制(防止突发流量打垮系统)
if (!rateLimiter.tryAcquire()) {
log.warn("达到流量限制,暂停消费");
Thread.sleep(1000);
// 不提交offset,下次继续消费这条消息
return;
}
// 3. 业务处理
processBusinessLogic(record.value());
// 4. 记录已处理(幂等标记)
idempotenceChecker.markAsProcessed(messageId);
// 5. 手动提交offset(处理成功才提交)
ack.acknowledge();
log.info("消息处理成功: {}", messageId);
} catch (BusinessException e) {
// 业务异常(如余额不足),进入死信队列
log.error("业务处理失败,进入死信队列", e);
deadLetterHandler.sendToDlq(record, e.getMessage());
ack.acknowledge(); // 提交offset,不再重试
} catch (SystemException e) {
// 系统异常(如数据库连接失败),需要重试
log.error("系统处理失败,等待重试", e);
// 不提交offset,让消息重新消费
throw e;
} catch (Exception e) {
// 未知异常
log.error("未知异常", e);
deadLetterHandler.sendToDlq(record, "UNKNOWN_ERROR");
ack.acknowledge();
}
}
// 批量消费版本
@KafkaListener(topics = "user-behavior",
containerFactory = "batchContainerFactory")
public void consumeBatch(List<ConsumerRecord<String, String>> records){
// 批量处理逻辑
List<CompletableFuture<Void>> futures = records.stream()
.map(record -> CompletableFuture.runAsync(() -> {
try {
consume(record);
} catch (Exception e) {
log.error("批量处理中单条消息失败", e);
}
}))
.collect(Collectors.toList());
// 等待所有消息处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
// 批量提交offset
ack.acknowledge();
})
.exceptionally(e -> {
log.error("批量处理失败", e);
return null;
});
}
}
4.5 第五步:设计监控与运维体系
监控指标清单:
# 生产者监控
producer_metrics:
- send_success_rate:发送成功率
- send_latency_p99:发送延迟P99
- retry_count:重试次数
- buffer_usage:缓冲区使用率
# 消费者监控
consumer_metrics:
- consume_lag:消费延迟(消息积压)
- process_success_rate:处理成功率
- process_latency_p99:处理延迟P99
- dead_letter_count:死信数量
# 系统监控
system_metrics:
- broker_health:Broker健康状态
- disk_usage:磁盘使用率
- network_io:网络IO
- connection_count:连接数
告警规则配置:
@Component
public class MessageQueueAlerts {
// 规则1:消息积压告警
@Scheduled(fixedRate = 60000)
public void checkConsumerLag(){
Map<TopicPartition, Long> lags = getConsumerLag();
lags.forEach((tp, lag) -> {
if (lag > 10000) { // 积压超过1万条
alertService.sendAlert(
"CONSUMER_LAG_HIGH",
Map.of(
"topic", tp.topic(),
"partition", tp.partition(),
"lag", lag.toString()
)
);
// 自动扩容消费者
if (canScaleOut(tp.topic())) {
scaleOutConsumer(tp.topic());
}
}
});
}
// 规则2:发送失败告警
@EventListener
public void handleSendFailure(SendFailureEvent event){
if (event.getFailureCount() > 10) {
alertService.sendAlert(
"PRODUCER_SEND_FAILURE",
Map.of(
"topic", event.getTopic(),
"failureCount", String.valueOf(event.getFailureCount()),
"lastError", event.getLastError()
)
);
}
}
// 规则3:死信过多告警
@Scheduled(fixedRate = 300000) // 5分钟
public void checkDeadLetter(){
long deadLetterCount = deadLetterService.getCountLastHour();
if (deadLetterCount > 100) {
alertService.sendAlert(
"DEAD_LETTER_HIGH",
Map.of("count", String.valueOf(deadLetterCount))
);
}
}
}
五、血泪教训:这三年我总结的10条军规
5.1 设计阶段
- 先定义消息契约,再写代码
// ❌ 错误:直接发JSON字符串
kafkaTemplate.send("order-events", orderId, order.toString());
// ✅ 正确:定义事件契约
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderCreatedEvent implements DomainEvent{
private String eventId;
private String orderId;
private String userId;
private BigDecimal amount;
private Date createdAt;
private String eventType = "ORDER_CREATED";
private int version = 1; // 版本号,用于兼容性
}
- 一个Topic只做一件事
// ❌ 错误:订单所有事件都发到一个Topic
// order-events: 包含创建、支付、发货、退款...
// ✅ 正确:按业务语义拆分Topic
// order-lifecycle-events: 创建、支付、发货(需要顺序)
// order-activity-events: 浏览、分享、收藏(不需要顺序)
// order-compensation-events: 退款、售后(补偿操作)
5.2 开发阶段
- 生产者必须考虑失败场景
// ❌ 错误:发送后不管
kafkaTemplate.send(topic, message);
// ✅ 正确:处理发送结果
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, message);
future.addCallback(
result -> log.info("发送成功: {}", result),
ex -> {
log.error("发送失败", ex);
// 1. 记录到本地存储
// 2. 触发告警
// 3. 启动补偿任务
}
);
- 消费者必须实现幂等
// 幂等方案1:数据库唯一约束
@Entity
@Table(name = "processed_messages",
uniqueConstraints = @UniqueConstraint(columnNames = "message_id"))
public class ProcessedMessage {
@Id
private String messageId;
private Date processedAt;
}
// 幂等方案2:Redis SETNX
public boolean isProcessed(String messageId){
String key = "processed:" + messageId;
// SET key value NX EX 86400 # 24小时过期
return !redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofHours(24));
}
5.3 运维阶段
- 监控必须到位才能睡觉
# 必须监控的四个黄金指标
# 1. 延迟(Latency): 消息从生产到消费的时间
# 2. 流量(Traffic): 生产/消费的QPS
# 3. 错误(Errors): 失败率、重试率
# 4. 饱和度(Saturation): 消息积压、磁盘使用率
- 必须有降级和熔断
@Component
public class MessageQueueCircuitBreaker {
private final CircuitBreaker circuitBreaker = CircuitBreaker.of(
"message-queue",
CircuitBreakerConfig.custom()
.failureRateThreshold(50) // 失败率50%触发熔断
.waitDurationInOpenState(Duration.ofSeconds(30))
.build()
);
@CircuitBreaker(name = "message-queue", fallbackMethod = "fallbackSend")
public SendResult sendWithCircuitBreaker(String topic, String message){
return kafkaTemplate.send(topic, message).get();
}
public SendResult fallbackSend(String topic, String message, Exception e){
// 1. 降级到本地存储
localStorage.save(topic, message);
// 2. 记录日志
log.warn("消息队列熔断,降级到本地存储", e);
// 3. 返回降级结果
return new DegradedSendResult();
}
}
5.4 架构层面
-
消息队列的优缺点
- 引入消息队列 → 系统复杂度增加
- 异步处理 → 问题排查难度增加
- 最终一致性 → 业务逻辑复杂度增加
-
根据团队能力选择技术栈
// 小团队(3-5人):RabbitMQ + 简单队列
// 优点:运维简单,社区成熟,文档丰富
// 缺点:吞吐量有限,功能相对简单
// 中等团队(10-20人):Kafka + 基础监控
// 优点:高吞吐,生态丰富
// 缺点:运维复杂,需要专人维护
// 大团队(50+人):自研消息中间件 + 完整监控体系
// 优点:完全可控,定制化强
// 缺点:研发成本高,需要专业团队
六、最后的话:消息队列是架构思维的试金石
在工作中我渐渐明白消息队列是架构思维的集中体现。
它考验你的:
- 抽象能力:能否从业务中抽象出合适的事件
- 权衡能力:在可靠性和性能之间找到平衡点
- 设计能力:设计出可扩展、可维护的消息流
- 工程能力:构建稳定可靠的生产级系统
消息队列用得好不好,不在于你用了多少高级功能,而在于:
- 业务是否真正需要它(而不是为了用而用)
- 设计是否简洁清晰(而不是过度设计)
- 故障是否可控可恢复(而不是一出问题就全崩)
- 团队是否能够驾驭(而不是引入后就没人会维护)
希望本文分享的从工具认知到架构思维的演进,以及对Kafka等消息队列的实战剖析,能帮助你在系统设计中做出更明智的决策。技术的选择与落地,最终都是为了更好地服务于业务目标。
本文探讨了消息队列在系统架构中的核心地位与实战经验,更多关于微服务、Java及高并发系统的深度讨论,欢迎访问云栈社区。