消息队列的消费模式主要分为两种:推(Push) 和 拉(Pull)。
- 推(Push)模式:服务端主动将消息推送给客户端。其优点是消息实时性较好,但若客户端未做好流量控制,服务端推送大量消息可能导致客户端消息堆积甚至崩溃。
- 拉(Pull)模式:客户端需要主动向服务端拉取消息。其优点在于客户端能依据自身消费能力控制拉取节奏,但拉取频率需要开发者自行控制,拉取过频会增加双方压力,间隔过长则可能导致消费延迟。
RocketMQ 的推模式实现类是 DefaultMQPushConsumerImpl。本质上,RocketMQ 的推模式是拉模式的一层封装,它内部实现了负载均衡等复杂逻辑,并为开发者提供了简洁易用的 API,降低了使用门槛。
本文将深入探讨 RocketMQ 的拉模式实现—— DefaultLitePullConsumer,解析其实现原理并介绍典型的使用场景。
1 示例-自动提交消费进度
首先,我们通过 DefaultLitePullConsumer 创建一个简单的、自动提交消费进度的拉模式消费者示例。
@Test
public void testAutoCommit() throws Exception {
boolean running = true;
// 定义消费者组 mygroup
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("mygroup");
// 设置名字服务地址
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
// 从最新的进度偏移量开始消费
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅主题 TopicTest
litePullConsumer.subscribe("TopicTest", "*");
// 自动提交消费偏移量的选项设置为 true
litePullConsumer.setAutoCommit(true);
// 启动 pull 消费者
litePullConsumer.start();
try {
while (running) {
// 调用消费者的 poll 方法
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s%n", messageExts);
}
} finally {
// 关闭消费者
litePullConsumer.shutdown();
}
}
代码核心流程如下:
- 创建
DefaultLitePullConsumer 实例,指定消费者组名为 "mygroup",并订阅主题 "TopicTest"。
- 设置从最新偏移量开始消费,并开启自动提交消费偏移量功能。
- 启动消费者,在一个
while 循环中持续调用 poll() 方法拉取消息并处理。
与推模式消费者在监听器内定义消费逻辑不同,DefaultLitePullConsumer 需要开发者手动调用拉取方法来获取消息,并自行编写消费逻辑,这在设计Java应用时提供了更高的灵活性。
2 示例-手工提交消费进度
第一节的示例开启了自动提交。若需手动提交消费进度,则需在业务逻辑中显式调用 commitSync() 方法。
@Test
public void testNoAutoCommit() throws MQClientException {
boolean running = true;
// 定义消费者组 mygroup
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("mygroup");
// 设置名字服务地址
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
// 从最新的进度偏移量
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅主题 TopicTest
litePullConsumer.subscribe("TopicTest", "*");
// 自动提交消费偏移量的选项设置为 false
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
try {
while (running) {
boolean hasException = false;
List<MessageExt> messageExts = litePullConsumer.poll(1000L);
if (CollectionUtils.isNotEmpty(messageExts)) {
System.out.println("消息大小:" + messageExts.size());
// 取出第一条消息数据
MessageExt first = messageExts.get(0);
MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
for (MessageExt messageExt : messageExts) {
// 打印消息内容
System.out.println(new String(first.getBody()));
// 业务处理代码
// doBusiness();
}
litePullConsumer.commitSync();
if (hasException) {
// 回滚到第一条消息的消费点位
litePullConsumer.seek(messageQueue, first.getQueueOffset());
}
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 最终关闭消费者
litePullConsumer.shutdown();
}
}
此代码与前例的主要区别在于:
litePullConsumer.setAutoCommit(false); 关闭了自动提交。
- 在成功处理一批消息后,调用
litePullConsumer.commitSync() 手动提交消费进度。
- 如果业务处理过程中发生异常,可通过
litePullConsumer.seek() 方法将消费点位回滚到这批消息的起始位置,实现消息重试。
通过测试可以发现拉模式的两个特点:
- 每次调用
poll() 方法,通常会获取到同一队列的一批消息。
- 多实例消费者同样支持负载均衡,因此拉模式完全可以用于生产环境,尤其在需要精细控制消费逻辑的后端 & 架构场景中。
3 实现原理
RocketMQ 推模式的消息处理流程可以简化为下图:

接下来,我们将对照此流程,分析拉模式 (DefaultLitePullConsumer) 的具体实现差异。
1、负载均衡
DefaultLitePullConsumer 启动后,同样会执行负载均衡流程,此逻辑与推模式基本一致。

如图所示,假设一个消费组内有两个消费者,某个主题下的四个队列会通过负载均衡平均分配给这两个消费者。分配完成后,拉模式会为每个分配到的队列启动独立的拉取任务。
2、拉取消息
拉模式与推模式在消息获取机制上差异显著。推模式下,消费者启动一个单线程的 PullMessageService 来统一处理所有队列的拉取请求。
而拉模式下,会启动一个定时任务线程池(默认20个线程)。如下图所示,如果消费者被分配了4个队列,则会创建4个独立的拉取任务(PullTaskImpl)。

每个拉取任务向其对应队列所在的Broker发送拉取请求,获取到的消息会放入一个本地的阻塞队列 consumerRequestCache 中缓存起来。
3、消费消息
在拉模式下,消费动作由开发者手动调用 poll() 方法触发。

poll() 方法会返回一个消息列表,开发者遍历此列表并执行业务消费逻辑。那么,poll() 方法底层做了什么呢?

其核心逻辑并不复杂:主要就是从本地的 LinkedBlockingQueue (consumerRequestCache) 中取出一个消费请求(ConsumerRequest)。取出后需要完成两件事:
- 从该请求对应的消费快照
ProcessQueue 中删除这些消息,并计算出新的消费偏移量。
- 更新本地维护的队列状态(
AssignedMessageQueue)中的消费偏移量。这个状态类记录了队列的分配信息、消费快照、拉取偏移量等。
4、保存进度/消费重试
拉模式可以通过 setAutoCommit() 设置是否自动提交消费进度:
litePullConsumer.setAutoCommit(true); // true :自动提交 false:手工提交
自动提交的原理是,每次调用 poll() 方法时,会检查是否到达自动提交时间点。

可以简单理解为,每隔 autoCommitIntervalMillis(默认5秒)会执行一次提交操作。

若设置为手动提交,则开发者调用 commitSync() 方法,其内部最终会调用 commitAll() 方法,遍历所有被分配的消息队列并提交其消费偏移量。
5、消费重试
拉模式下若消费失败,可以调用 seek() 方法将指定队列的消费点位重置到特定位置,实现消息重试。
MessageExt first = messageExts.get(0);
MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
// 回滚到第一条消息的点位
litePullConsumer.seek(messageQueue, first.getQueueOffset());
seek() 方法的内部逻辑如下图所示:

其核心流程如下:
- 获取目标队列的对象锁,保证操作原子性。
- 清除本地缓存中与该队列相关的数据(
ProcessQueue 和 consumerRequestCache)。
- 中断该队列旧的拉取任务并从任务表中移除。
- 设置该队列的查找偏移量(Seek Offset),此偏移量将在下次拉取时使用,这是实现精准回退的关键。
- 为该队列创建并启动一个新的拉取任务。
4 应用场景
拉模式 (DefaultLitePullConsumer) 是非常值得掌握的一种消息消费方式,尤其适用于以下场景:
1、需要精细控制消费逻辑
拉模式提供了极高的灵活性。开发者可以在消费前执行条件判断,也可以在消费失败后,精确回退到某个偏移量重新消费,这在处理业务逻辑复杂的消息时非常有用。
2、高吞吐批处理或大数据场景
拉模式消费者默认会创建多个拉取线程(默认20个),每个线程独立负责一个队列的消息拉取。这种并发拉取机制在消息拉取效率上比单线程的推模式有显著优势,特别适合用于大数据领域的批处理任务或需要高吞吐量的数据库/中间件/技术栈集成场景。
希望本文能帮助你深入理解 RocketMQ 拉模式消费者的工作原理,并能在合适的项目中加以应用。更多关于分布式系统与消息队列的深度讨论,欢迎在云栈社区交流探讨。