当Kafka在许多场景下努力应对高吞吐需求时,Apache Pulsar凭借其云原生架构,已在实践中展现了处理千万级消息与毫秒级延迟的潜力。本文将详细指导你如何使用SpringBoot构建基于Pulsar的实时应用,并深入分析其相较于传统方案的性能优势。
1. 技术概述与核心对比
1.1 Apache Pulsar 简介
Apache Pulsar 是 Apache 软件基金会顶级项目,定位为 “统一消息与流处理平台”,采用计算与存储分离的云原生架构,天然支持多租户、多消息模型(队列 / 流)、跨地域复制等核心特性。其核心优势在于:
- 架构灵活:Broker 仅负责计算转发,存储由独立的 BookKeeper 集群承担,支持弹性扩缩容;
- 功能全面:同时支持消息队列(MQ)和流处理(Stream),无需集成多个组件;
- 兼容性强:兼容 Kafka 客户端协议,可无缝迁移 Kafka 应用;
- 运维友好:支持分层存储(冷热数据分离)、自动负载均衡、跨区域复制。
1.2 Pulsar vs Kafka:核心差异与 Kafka 痛点解析
1.2.1 Kafka 的核心缺点
作为传统消息流平台的代表,Kafka 在实际生产中存在诸多难以规避的问题:
- 架构耦合严重:计算(Broker)与存储(日志分区)深度绑定,扩容时需迁移分区数据,操作复杂且易引发性能抖动;
- 存储效率低下:仅支持本地磁盘存储,冷热数据无法分离,长期存储成本高,且不支持数据分层归档;
- 消息模型单一:仅支持流模型(基于订阅的日志消费),如需实现队列模式(点对点、发布订阅)需额外开发封装;
- 跨地域复制弱:依赖 MirrorMaker 工具实现跨集群同步,延迟高、可靠性差,且不支持双向复制;
- 运维复杂度高:分区迁移、负载均衡需手动操作,集群扩容易导致数据倾斜,监控告警体系不完善。
1.2.2 Pulsar 的核心优势
- 计算存储分离:Broker 无状态,扩容时无需迁移数据,BookKeeper 集群独立扩容,支持 PB 级数据存储;
- 多消息模型统一:原生支持队列(Queue)、流(Stream)、分区主题(Partitioned Topic),无需额外适配;
- 分层存储:支持将冷数据自动迁移至 S3、GCS 等对象存储,存储成本降低 70% 以上;
- 强跨地域复制:内置跨集群复制功能,支持同步 / 异步复制策略,延迟低至毫秒级,且支持双向复制;
- 兼容性与生态完善:兼容 Kafka、AMQP、MQTT 等客户端协议,可直接复用现有客户端代码,同时集成 Flink、Spark 等流处理框架。
1.2.3 性能对比实测(基于相同硬件环境)
为验证两者性能差异,我们搭建了相同配置的测试环境(3 节点 Broker、3 节点存储集群,每节点 8 核 16G 内存、1TB SSD),测试场景为 “生产 - 消费吞吐量”“端到端延迟”“百万级消息堆积后消费恢复速度”,结果如下:
| 测试指标 |
Kafka 3.5.0 |
Pulsar 3.1.0 |
优势方 |
| 单主题吞吐量(16 分区) |
生产:85MB/s,消费:92MB/s |
生产:110MB/s,消费:135MB/s |
Pulsar |
| 端到端延迟(1KB 消息) |
平均 25ms,峰值 120ms |
平均 8ms,峰值 45ms |
Pulsar |
| 百万消息堆积后消费延迟 |
恢复时间 18s,延迟峰值 300ms |
恢复时间 3s,延迟峰值 60ms |
Pulsar |
| 集群扩容后性能波动 |
波动持续 5-8 分钟 |
波动秒级 |
Pulsar |
| 10 亿消息存储占用空间 |
约 800GB |
约 320GB(开启分层存储) |
Pulsar |
结论:Pulsar 在吞吐量、低延迟、堆积恢复、存储效率等核心指标上全面领先 Kafka,尤其适合高并发、大数据量、低延迟的实时应用场景。
2. 基于SpringBoot 快速构建一个 Pulsar 实时应用
2.1 环境准备:安装 Pulsar Standalone 集群(本地开发环境)
- 下载 Pulsar 二进制包:访问 Apache 官网( https://pulsar.apache.org/download/ ),选择最新稳定版,下载后解压;
- 启动 Standalone 集群(内置 BookKeeper 存储,无需额外配置):
# 进入Pulsar解压目录
cd apache-pulsar-4.1.0
# 启动Standalone模式,默认端口6650(TCP协议)、8080(HTTP协议)
bin/pulsar standalone
- 验证启动成功:访问
http://localhost:8080/admin/v2/clusters,返回集群信息即表示启动正常。
2.2 创建 Spring Boot 项目
- 项目类型:Maven Project
- Spring Boot 版本:3.0+(推荐 3.1.x)
- 依赖选择:Spring Web、Spring Boot Starter Pulsar
2.3 引入 Maven 依赖
在项目 pom.xml 中添加 Pulsar 核心依赖(Spring Boot Starter Pulsar 已集成 Pulsar 客户端,无需额外引入):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 版本由Spring Boot父工程统一管理,无需手动指定 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-pulsar</artifactId>
</dependency>
3. 核心配置
3.1 配置 Pulsar 客户端
在 src/main/resources/application.yml 中添加 Pulsar 连接配置,支持单机和集群模式:
spring:
pulsar:
client:
service-url: pulsar://127.0.0.1:6650 # Pulsar服务连接地址
auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken # 认证插件类名
authentication:
token: YOUR_PULSAR_TOKEN # token信息
operation-timeout: 30s # 操作超时时间
connection-timeout: 10s # 连接超时时间
service-url 指定了 Pulsar 服务的地址和端口,这是客户端连接到 Pulsar 集群的关键配置。如果 Pulsar 集群启用了 SSL 加密通信,地址格式应改为 pulsar+ssl://host:port。auth-plugin-class-name 和 token 用于配置认证信息,当 Pulsar 集群开启了认证功能时,客户端需要提供有效的认证信息才能连接到集群。operation-timeout 和 connection-timeout 分别设置了客户端操作和连接的超时时间,合理设置这些参数可以提高系统的稳定性和可靠性。
3.2 配置生产者(自定义主题与路由)
在 Spring Boot 中,我们可以通过 PulsarTemplate 来配置 Pulsar 生产者,并且可以使用 ProducerBuilderCustomizer 进行灵活的自定义配置。以下是一些常见的生产者配置示例:
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Service
public class PulsarProducerService {
@Autowired
private PulsarTemplate<byte[]> pulsarTemplate;
// 同步发送消息
public MessageId sendMessage(String topic, byte[] message) throws PulsarClientException {
return pulsarTemplate.send(topic, message);
}
// 使用ProducerBuilderCustomizer自定义生产者参数
public MessageId sendMessageWithProducerCustomizer(String topic, byte[] message, int timeout, TimeUnit unit, BatcherBuilder builder) throws PulsarClientException {
return pulsarTemplate.newMessage(message)
.withTopic(topic)
.withProducerCustomizer(pc -> {
pc.batcherBuilder(builder)
.sendTimeout(timeout, unit)
.enableBatching(true)
.batchingMaxMessages(10);
})
.send();
}
// 指定消息Key
public <T> MessageId sendMessageWithKey(String topic, T message, String key) throws PulsarClientException {
if (StringUtils.isEmpty(key)) {
return sendMessage(topic, (byte[]) message);
} else {
return pulsarTemplate.newMessage((byte[]) message)
.withTopic(topic)
.withMessageCustomizer(messageBuilder -> {
messageBuilder.key(key);
})
.send();
}
}
// 指定压缩类型
public <T> MessageId sendMessage(String topic, T message, CompressionType compressionType) throws PulsarClientException {
if (compressionType == null || compressionType == CompressionType.NONE) {
return sendMessage(topic, (byte[]) message);
} else {
return pulsarTemplate.newMessage((byte[]) message)
.withTopic(topic)
.withProducerCustomizer(producerBuilder -> {
producerBuilder.compressionType(compressionType);
})
.send();
}
}
// 发送包含自定义Header的消息
public MessageId sendMessageWithHeaders(String topic, byte[] message, MessageHeaders headers) throws PulsarClientException {
Message<byte[]> pulsarMessage = MessageBuilder.createMessage(message, headers);
return pulsarTemplate.send(topic, pulsarMessage);
}
// 异步发送消息
public CompletableFuture<MessageId> sendMessageAsync(String topic, byte[] message) {
return pulsarTemplate.sendAsync(topic, message);
}
// 发送消息时添加拦截器
@Bean
public ProducerInterceptor<byte[]> messageInterceptor() {
return new ProducerInterceptor<byte[]>() {
@Override
public Message<byte[]> beforeSend(String topic, Message<byte[]> message) {
// 可以在此处对消息进行预处理,例如添加时间戳等
MessageHeaders headers = message.getHeaders();
Message<byte[]> newMessage = MessageBuilder.createMessage(message.getPayload(), headers);
return newMessage;
}
@Override
public void onSendFailure(String topic, Message<byte[]> message, Throwable exception) {
// 处理发送失败的情况,例如记录日志
System.err.println("Message send failed: " + exception.getMessage());
}
@Override
public void onSendSuccess(String topic, Message<byte[]> message, MessageId messageId) {
// 处理发送成功的情况,例如记录日志
System.out.println("Message sent successfully: " + messageId);
}
};
}
}
3.3 配置消费者(订阅主题与重试策略)
在 Spring Boot 中,我们可以使用 @PulsarListener 注解来配置 Pulsar 消费者,同时可以在配置文件中设置一些消费者的相关参数。以下是配置消费者的示例:
在 application.yml 中,添加如下配置:
spring:
pulsar:
consumer:
topic: persistent://public/default/my-topic # 订阅的主题
subscriptionName: my-subscription # 订阅名称
subscriptionType: Exclusive # 订阅模式
negativeAckRedeliveryBackoff:
maxDelayMs: 60000 # 最大重试延迟时间
minDelayMs: 1000 # 最小重试延迟时间
multiplier: 2.0 # 重试延迟乘数
deadLetterPolicy:
maxRedeliverCount: 3 # 最大重试次数
deadLetterTopic: persistent://public/default/my-dead-letter-topic # 死信队列主题
编写消费者服务类:
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class PulsarConsumerService {
@Value("${spring.pulsar.consumer.topic}")
private String topic;
@Value("${spring.pulsar.consumer.subscriptionName}")
private String subscriptionName;
@Value("${spring.pulsar.consumer.subscriptionType}")
private SubscriptionType subscriptionType;
@Value("${spring.pulsar.consumer.negativeAckRedeliveryBackoff.maxDelayMs}")
private long maxDelayMs;
@Value("${spring.pulsar.consumer.negativeAckRedeliveryBackoff.minDelayMs}")
private long minDelayMs;
@Value("${spring.pulsar.consumer.negativeAckRedeliveryBackoff.multiplier}")
private double multiplier;
@Value("${spring.pulsar.consumer.deadLetterPolicy.maxRedeliverCount}")
private int maxRedeliverCount;
@Value("${spring.pulsar.consumer.deadLetterPolicy.deadLetterTopic}")
private String deadLetterTopic;
@PulsarListener(topic = "${spring.pulsar.consumer.topic}", subscriptionName = "${spring.pulsar.consumer.subscriptionName}", subscriptionType = "${spring.pulsar.consumer.subscriptionType}")
public void receiveMessage(@Payload String message) {
System.out.println("Received message: " + message);
// 处理消息的业务逻辑
}
@Bean
public NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder()
.maxDelayMs(maxDelayMs)
.minDelayMs(minDelayMs)
.multiplier(multiplier)
.build();
}
@Bean
public DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliverCount)
.deadLetterTopic(deadLetterTopic)
.build();
}
}
4. 代码实战
4.1 编写生产者代码(发送消息)
使用 PulsarTemplate 发送消息,支持同步发送、异步发送、带 Schema 的消息发送,示例如下:
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
@Service
public class PulsarProducerService {
@Autowired
private PulsarTemplate<byte[]> pulsarTemplate;
// 同步消息发送
public MessageId sendMessage(String topic, byte[] message) throws Exception {
return pulsarTemplate.send(topic, message);
}
// 发送包含Schema的消息
public MessageId sendMessageWithSchema(String topic, GenericRecord message) throws Exception {
return pulsarTemplate.send(topic, message, Schema.AVRO(GenericRecord.class));
}
// 指定消息Key
public <T> MessageId sendMessageWithKey(String topic, T message, String key) throws Exception {
return pulsarTemplate.newMessage((byte[]) message)
.withTopic(topic)
.withMessageCustomizer(messageBuilder -> {
messageBuilder.key(key);
})
.send();
}
// 指定压缩类型
public <T> MessageId sendMessage(String topic, T message, CompressionType compressionType) throws Exception {
return pulsarTemplate.newMessage((byte[]) message)
.withTopic(topic)
.withProducerCustomizer(producerBuilder -> {
producerBuilder.compressionType(compressionType);
})
.send();
}
// 发送包含自定义Header的消息
public MessageId sendMessageWithHeaders(String topic, byte[] message, MessageHeaders headers) throws Exception {
Message<byte[]> pulsarMessage = MessageBuilder.createMessage(message, headers);
return pulsarTemplate.send(topic, pulsarMessage);
}
// 异步发送消息
public CompletableFuture<MessageId> sendMessageAsync(String topic, byte[] message) {
return pulsarTemplate.sendAsync(topic, message);
}
}
4.2 编写消费者代码(接收消息)
通过 @PulsarListener 注解监听主题,支持接收普通消息、结构化消息,示例如下:
import org.apache.pulsar.client.api.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class PulsarConsumerService {
@PulsarListener(topic = "my-topic", subscriptionName = "my-subscription", subscriptionType = SubscriptionType.Exclusive)
public void receiveMessage(@Payload String message) {
System.out.println("Received message: " + message);
// 处理消息的业务逻辑
}
// 处理消息对象
@PulsarListener(topic = "my-topic", subscriptionName = "my-subscription", subscriptionType = SubscriptionType.Exclusive)
public void receiveObjectMessage(@Payload MyMessageObject message) {
System.out.println("Received object message: " + message);
// 处理消息对象的业务逻辑
}
// 处理带自定义Header的消息
@PulsarListener(topic = "my-topic", subscriptionName = "my-subscription", subscriptionType = SubscriptionType.Exclusive)
public void receiveMessageWithHeaders(Message<String> message) {
MessageHeaders headers = message.getHeaders();
String payload = message.getPayload();
System.out.println("Received message with headers: " + payload);
System.out.println("Headers: " + headers);
// 根据Header进行不同的业务逻辑处理
}
// 处理异常情况
@PulsarListener(topic = "my-topic", subscriptionName = "my-subscription", subscriptionType = SubscriptionType.Exclusive)
public void receiveMessageWithExceptionHandling(@Payload String message) {
try {
System.out.println("Received message: " + message);
// 处理消息的业务逻辑
if (message.contains("error")) {
throw new RuntimeException("Message contains error");
}
// 处理成功
} catch (Exception e) {
// 处理异常,例如记录日志
System.err.println("Error processing message: " + e.getMessage());
// 根据配置进行重试或发送到死信队列
}
}
}
5. 高级特性
5.1 Schema 管理(结构化消息处理)
Pulsar 支持多种 Schema 类型(JSON、Avro、Protobuf、KeyValue 等),无需手动序列化 / 反序列化,示例如下:
以 JSON Schema 为例,假设我们有一个 User 类:
@Data
public class User {
private String id;
private String name;
private int age;
}
注册自定义 Schema:
import org.apache.pulsar.client.api.Schema;
import org.springframework.context.annotation.Bean;
import org.springframework.pulsar.config.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.config.DefaultSchemaResolver;
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
};
}
在生产者中使用 Schema 发送消息:
@Service
public class PulsarProducerService {
@Autowired
private PulsarTemplate<User> pulsarTemplate;
public MessageId sendUserMessage(String topic, User user) throws Exception {
return pulsarTemplate.send(topic, user, Schema.JSON(User.class));
}
}
在消费者中使用 Schema 接收消息:
@Component
public class PulsarConsumerService {
@PulsarListener(topic = "user-topic", subscriptionName = "user-subscription", schemaType = SchemaType.JSON)
public void receiveUserMessage(@Payload User user) {
System.out.println("Received user message: " + user);
// 处理用户消息的业务逻辑
}
}
5.2 事务支持
在一些业务场景中,需要确保多条消息的发送要么全部成功,要么全部失败。Pulsar 提供了强大的事务支持。首先配置 PulsarTransactionManager:
@Bean
public PulsarTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) {
return new PulsarTransactionManager(pulsarClient);
}
在服务类中使用事务发送消息:
@Service
public class TransactionalService {
@Autowired
private PulsarTemplate<String> orderProducer;
@Autowired
private PulsarTemplate<String> paymentProducer;
@Transactional
public void processTransaction(String orderId, String paymentId) throws Exception {
// 在同一个事务内发送两条消息
orderProducer.send("order-topic", "Order: " + orderId);
// 模拟业务逻辑,如果支付ID为空则回滚
if (paymentId == null) {
throw new RuntimeException("Payment ID is null, transaction will rollback.");
}
paymentProducer.send("payment-topic", "Payment: " + paymentId);
}
}
5.3 消息路由策略
Pulsar 允许自定义消息路由策略,通过实现 MessageRouter 接口来控制消息的路由。例如,根据订单 ID 将消息发送到特定分区:
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TopicMetadata;
public class OrderIdRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
String orderId = (String) msg.getKey();
int partitionCount = metadata.numPartitions();
// 根据订单ID的哈希值选择分区,确保同一订单的消息发送到相同分区
return Math.abs(orderId.hashCode()) % partitionCount;
}
}
在生产者中使用自定义路由策略:
@Service
public class OrderProducerService {
@Autowired
private PulsarClient pulsarClient;
@Bean
public Producer<String> orderProducer() throws Exception {
return pulsarClient.newProducer(Schema.STRING)
.topic("order-topic")
.messageRouter(new OrderIdRouter())
.create();
}
public MessageId sendOrderMessage(String orderId, String orderMessage) throws Exception {
Producer<String> producer = orderProducer();
return producer.newMessage()
.key(orderId)
.value(orderMessage)
.send();
}
}
本文小结
通过以上步骤,我们基于 SpringBoot 成功构建了 Apache Pulsar 实时应用。从环境准备、核心配置到代码实战,再到高级特性的应用,全面地展示了如何利用 Spring Boot 和 Apache Pulsar 的云原生架构优势,打造一个高效、可靠的实时数据处理系统。如果你正在为传统消息中间件的延迟、扩容或成本问题寻找解决方案,不妨尝试本文介绍的 SpringBoot 集成 Pulsar 方案。