找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2228

积分

0

好友

312

主题
发表于 4 天前 | 查看: 12| 回复: 0

消息队列的消费模式主要分为两种:推(Push)拉(Pull)

  • 推(Push)模式:服务端主动将消息推送给客户端。其优点是消息实时性较好,但若客户端未做好流量控制,服务端推送大量消息可能导致客户端消息堆积甚至崩溃。
  • 拉(Pull)模式:客户端需要主动向服务端拉取消息。其优点在于客户端能依据自身消费能力控制拉取节奏,但拉取频率需要开发者自行控制,拉取过频会增加双方压力,间隔过长则可能导致消费延迟。

RocketMQ 的推模式实现类是 DefaultMQPushConsumerImpl。本质上,RocketMQ 的推模式是拉模式的一层封装,它内部实现了负载均衡等复杂逻辑,并为开发者提供了简洁易用的 API,降低了使用门槛。

本文将深入探讨 RocketMQ 的拉模式实现—— DefaultLitePullConsumer,解析其实现原理并介绍典型的使用场景。

1 示例-自动提交消费进度

首先,我们通过 DefaultLitePullConsumer 创建一个简单的、自动提交消费进度的拉模式消费者示例。

@Test
public void testAutoCommit() throws Exception {
    boolean running = true;
    // 定义消费者组 mygroup
    DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("mygroup");
    // 设置名字服务地址
    litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
    // 从最新的进度偏移量开始消费
    litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // 订阅主题 TopicTest
    litePullConsumer.subscribe("TopicTest", "*");
    // 自动提交消费偏移量的选项设置为 true
    litePullConsumer.setAutoCommit(true);
    // 启动 pull 消费者
    litePullConsumer.start();
    try {
        while (running) {
            // 调用消费者的 poll 方法
            List<MessageExt> messageExts = litePullConsumer.poll();
            System.out.printf("%s%n", messageExts);
        }
    } finally {
        // 关闭消费者
        litePullConsumer.shutdown();
    }
}

代码核心流程如下:

  1. 创建 DefaultLitePullConsumer 实例,指定消费者组名为 "mygroup",并订阅主题 "TopicTest"
  2. 设置从最新偏移量开始消费,并开启自动提交消费偏移量功能。
  3. 启动消费者,在一个 while 循环中持续调用 poll() 方法拉取消息并处理。

与推模式消费者在监听器内定义消费逻辑不同,DefaultLitePullConsumer 需要开发者手动调用拉取方法来获取消息,并自行编写消费逻辑,这在设计Java应用时提供了更高的灵活性。

2 示例-手工提交消费进度

第一节的示例开启了自动提交。若需手动提交消费进度,则需在业务逻辑中显式调用 commitSync() 方法。

@Test
public void testNoAutoCommit() throws MQClientException {
    boolean running = true;
    // 定义消费者组 mygroup
    DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("mygroup");
    // 设置名字服务地址
    litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
    // 从最新的进度偏移量
    litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // 订阅主题 TopicTest
    litePullConsumer.subscribe("TopicTest", "*");
    // 自动提交消费偏移量的选项设置为 false
    litePullConsumer.setAutoCommit(false);
    litePullConsumer.start();
    try {
        while (running) {
            boolean hasException = false;
            List<MessageExt> messageExts = litePullConsumer.poll(1000L);
            if (CollectionUtils.isNotEmpty(messageExts)) {
                System.out.println("消息大小:" + messageExts.size());
                // 取出第一条消息数据
                MessageExt first = messageExts.get(0);
                MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
                for (MessageExt messageExt : messageExts) {
                    // 打印消息内容
                    System.out.println(new String(first.getBody()));
                    // 业务处理代码
                    // doBusiness();
                }
                litePullConsumer.commitSync();
                if (hasException) {
                    // 回滚到第一条消息的消费点位
                    litePullConsumer.seek(messageQueue, first.getQueueOffset());
                }
            }
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        // 最终关闭消费者
        litePullConsumer.shutdown();
    }
}

此代码与前例的主要区别在于:

  1. litePullConsumer.setAutoCommit(false); 关闭了自动提交。
  2. 在成功处理一批消息后,调用 litePullConsumer.commitSync() 手动提交消费进度。
  3. 如果业务处理过程中发生异常,可通过 litePullConsumer.seek() 方法将消费点位回滚到这批消息的起始位置,实现消息重试。

通过测试可以发现拉模式的两个特点:

  • 每次调用 poll() 方法,通常会获取到同一队列的一批消息。
  • 多实例消费者同样支持负载均衡,因此拉模式完全可以用于生产环境,尤其在需要精细控制消费逻辑的后端 & 架构场景中。

3 实现原理

RocketMQ 推模式的消息处理流程可以简化为下图:

RocketMQ消息处理基本流程

接下来,我们将对照此流程,分析拉模式 (DefaultLitePullConsumer) 的具体实现差异。

1、负载均衡

DefaultLitePullConsumer 启动后,同样会执行负载均衡流程,此逻辑与推模式基本一致。

RocketMQ主题队列与消费组模型

如图所示,假设一个消费组内有两个消费者,某个主题下的四个队列会通过负载均衡平均分配给这两个消费者。分配完成后,拉模式会为每个分配到的队列启动独立的拉取任务。

2、拉取消息

拉模式与推模式在消息获取机制上差异显著。推模式下,消费者启动一个单线程的 PullMessageService 来统一处理所有队列的拉取请求。

而拉模式下,会启动一个定时任务线程池(默认20个线程)。如下图所示,如果消费者被分配了4个队列,则会创建4个独立的拉取任务(PullTaskImpl)。

DefaultLitePullConsumer拉取任务调度流程

每个拉取任务向其对应队列所在的Broker发送拉取请求,获取到的消息会放入一个本地的阻塞队列 consumerRequestCache 中缓存起来。

3、消费消息

在拉模式下,消费动作由开发者手动调用 poll() 方法触发。

DefaultLitePullConsumer poll方法调用示例

poll() 方法会返回一个消息列表,开发者遍历此列表并执行业务消费逻辑。那么,poll() 方法底层做了什么呢?

DefaultLitePullConsumer poll方法核心源码逻辑

其核心逻辑并不复杂:主要就是从本地的 LinkedBlockingQueue (consumerRequestCache) 中取出一个消费请求(ConsumerRequest)。取出后需要完成两件事:

  1. 从该请求对应的消费快照 ProcessQueue 中删除这些消息,并计算出新的消费偏移量。
  2. 更新本地维护的队列状态(AssignedMessageQueue)中的消费偏移量。这个状态类记录了队列的分配信息、消费快照、拉取偏移量等。

4、保存进度/消费重试

拉模式可以通过 setAutoCommit() 设置是否自动提交消费进度:

litePullConsumer.setAutoCommit(true); // true :自动提交 false:手工提交

自动提交的原理是,每次调用 poll() 方法时,会检查是否到达自动提交时间点。

DefaultLitePullConsumer自动提交检查逻辑

可以简单理解为,每隔 autoCommitIntervalMillis(默认5秒)会执行一次提交操作。

DefaultLitePullConsumer提交偏移量核心方法

若设置为手动提交,则开发者调用 commitSync() 方法,其内部最终会调用 commitAll() 方法,遍历所有被分配的消息队列并提交其消费偏移量。

5、消费重试

拉模式下若消费失败,可以调用 seek() 方法将指定队列的消费点位重置到特定位置,实现消息重试。

MessageExt first = messageExts.get(0);
MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
// 回滚到第一条消息的点位
litePullConsumer.seek(messageQueue, first.getQueueOffset());

seek() 方法的内部逻辑如下图所示:

DefaultLitePullConsumer seek方法执行流程

其核心流程如下:

  1. 获取目标队列的对象锁,保证操作原子性。
  2. 清除本地缓存中与该队列相关的数据(ProcessQueueconsumerRequestCache)。
  3. 中断该队列旧的拉取任务并从任务表中移除。
  4. 设置该队列的查找偏移量(Seek Offset),此偏移量将在下次拉取时使用,这是实现精准回退的关键。
  5. 为该队列创建并启动一个新的拉取任务。

4 应用场景

拉模式 (DefaultLitePullConsumer) 是非常值得掌握的一种消息消费方式,尤其适用于以下场景:

1、需要精细控制消费逻辑
拉模式提供了极高的灵活性。开发者可以在消费前执行条件判断,也可以在消费失败后,精确回退到某个偏移量重新消费,这在处理业务逻辑复杂的消息时非常有用。

2、高吞吐批处理或大数据场景
拉模式消费者默认会创建多个拉取线程(默认20个),每个线程独立负责一个队列的消息拉取。这种并发拉取机制在消息拉取效率上比单线程的推模式有显著优势,特别适合用于大数据领域的批处理任务或需要高吞吐量的数据库/中间件/技术栈集成场景。

希望本文能帮助你深入理解 RocketMQ 拉模式消费者的工作原理,并能在合适的项目中加以应用。更多关于分布式系统与消息队列的深度讨论,欢迎在云栈社区交流探讨。




上一篇:VS Code性能远超其他Electron应用?解析其底层架构与优化策略
下一篇:JavaScript + JSDoc + tsc:优雅获得类型安全与开发体验的实践指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 08:53 , Processed in 0.324506 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表