找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

590

积分

0

好友

82

主题
发表于 前天 05:48 | 查看: 6| 回复: 0

当Kafka在许多场景下努力应对高吞吐需求时,Apache Pulsar凭借其云原生架构,已在实践中展现了处理千万级消息与毫秒级延迟的潜力。本文将详细指导你如何使用SpringBoot构建基于Pulsar的实时应用,并深入分析其相较于传统方案的性能优势。

1. 技术概述与核心对比

1.1 Apache Pulsar 简介

Apache Pulsar 是 Apache 软件基金会顶级项目,定位为 “统一消息与流处理平台”,采用计算与存储分离的云原生架构,天然支持多租户、多消息模型(队列 / 流)、跨地域复制等核心特性。其核心优势在于:

  • 架构灵活:Broker 仅负责计算转发,存储由独立的 BookKeeper 集群承担,支持弹性扩缩容;
  • 功能全面:同时支持消息队列(MQ)和流处理(Stream),无需集成多个组件;
  • 兼容性强:兼容 Kafka 客户端协议,可无缝迁移 Kafka 应用;
  • 运维友好:支持分层存储(冷热数据分离)、自动负载均衡、跨区域复制。

1.2 Pulsar vs Kafka:核心差异与 Kafka 痛点解析

1.2.1 Kafka 的核心缺点

作为传统消息流平台的代表,Kafka 在实际生产中存在诸多难以规避的问题:

  1. 架构耦合严重:计算(Broker)与存储(日志分区)深度绑定,扩容时需迁移分区数据,操作复杂且易引发性能抖动;
  2. 存储效率低下:仅支持本地磁盘存储,冷热数据无法分离,长期存储成本高,且不支持数据分层归档;
  3. 消息模型单一:仅支持流模型(基于订阅的日志消费),如需实现队列模式(点对点、发布订阅)需额外开发封装;
  4. 跨地域复制弱:依赖 MirrorMaker 工具实现跨集群同步,延迟高、可靠性差,且不支持双向复制;
  5. 运维复杂度高:分区迁移、负载均衡需手动操作,集群扩容易导致数据倾斜,监控告警体系不完善。
1.2.2 Pulsar 的核心优势
  1. 计算存储分离:Broker 无状态,扩容时无需迁移数据,BookKeeper 集群独立扩容,支持 PB 级数据存储;
  2. 多消息模型统一:原生支持队列(Queue)、流(Stream)、分区主题(Partitioned Topic),无需额外适配;
  3. 分层存储:支持将冷数据自动迁移至 S3、GCS 等对象存储,存储成本降低 70% 以上;
  4. 强跨地域复制:内置跨集群复制功能,支持同步 / 异步复制策略,延迟低至毫秒级,且支持双向复制;
  5. 兼容性与生态完善:兼容 Kafka、AMQP、MQTT 等客户端协议,可直接复用现有客户端代码,同时集成 Flink、Spark 等流处理框架。
1.2.3 性能对比实测(基于相同硬件环境)

为验证两者性能差异,我们搭建了相同配置的测试环境(3 节点 Broker、3 节点存储集群,每节点 8 核 16G 内存、1TB SSD),测试场景为 “生产 - 消费吞吐量”“端到端延迟”“百万级消息堆积后消费恢复速度”,结果如下:

测试指标 Kafka 3.5.0 Pulsar 3.1.0 优势方
单主题吞吐量(16 分区) 生产:85MB/s,消费:92MB/s 生产:110MB/s,消费:135MB/s Pulsar
端到端延迟(1KB 消息) 平均 25ms,峰值 120ms 平均 8ms,峰值 45ms Pulsar
百万消息堆积后消费延迟 恢复时间 18s,延迟峰值 300ms 恢复时间 3s,延迟峰值 60ms Pulsar
集群扩容后性能波动 波动持续 5-8 分钟 波动秒级 Pulsar
10 亿消息存储占用空间 约 800GB 约 320GB(开启分层存储) Pulsar

结论:Pulsar 在吞吐量、低延迟、堆积恢复、存储效率等核心指标上全面领先 Kafka,尤其适合高并发、大数据量、低延迟的实时应用场景。

2. 基于SpringBoot 快速构建一个 Pulsar 实时应用

2.1 环境准备:安装 Pulsar Standalone 集群(本地开发环境)

  1. 下载 Pulsar 二进制包:访问 Apache 官网( https://pulsar.apache.org/download/ ),选择最新稳定版,下载后解压;
  2. 启动 Standalone 集群(内置 BookKeeper 存储,无需额外配置):
    # 进入Pulsar解压目录
    cd apache-pulsar-4.1.0
    # 启动Standalone模式,默认端口6650(TCP协议)、8080(HTTP协议)
    bin/pulsar standalone
  3. 验证启动成功:访问 http://localhost:8080/admin/v2/clusters,返回集群信息即表示启动正常。

2.2 创建 Spring Boot 项目

  • 项目类型:Maven Project
  • Spring Boot 版本:3.0+(推荐 3.1.x)
  • 依赖选择:Spring Web、Spring Boot Starter Pulsar

2.3 引入 Maven 依赖

在项目 pom.xml 中添加 Pulsar 核心依赖(Spring Boot Starter Pulsar 已集成 Pulsar 客户端,无需额外引入):

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 版本由Spring Boot父工程统一管理,无需手动指定 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-pulsar</artifactId>
</dependency>

3. 核心配置

3.1 配置 Pulsar 客户端

src/main/resources/application.yml 中添加 Pulsar 连接配置,支持单机和集群模式:

spring:
  pulsar:
    client:
      service-url: pulsar://127.0.0.1:6650 # Pulsar服务连接地址
      auth-plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken # 认证插件类名
      authentication:
        token: YOUR_PULSAR_TOKEN # token信息
      operation-timeout: 30s # 操作超时时间
      connection-timeout: 10s # 连接超时时间

service-url 指定了 Pulsar 服务的地址和端口,这是客户端连接到 Pulsar 集群的关键配置。如果 Pulsar 集群启用了 SSL 加密通信,地址格式应改为 pulsar+ssl://host:portauth-plugin-class-nametoken 用于配置认证信息,当 Pulsar 集群开启了认证功能时,客户端需要提供有效的认证信息才能连接到集群。operation-timeoutconnection-timeout 分别设置了客户端操作和连接的超时时间,合理设置这些参数可以提高系统的稳定性和可靠性。

3.2 配置生产者(自定义主题与路由)

在 Spring Boot 中,我们可以通过 PulsarTemplate 来配置 Pulsar 生产者,并且可以使用 ProducerBuilderCustomizer 进行灵活的自定义配置。以下是一些常见的生产者配置示例:

import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Service
public class PulsarProducerService {
    @Autowired
    private PulsarTemplate<byte[]> pulsarTemplate;

    // 同步发送消息
    public MessageId sendMessage(String topic, byte[] message) throws PulsarClientException {
        return pulsarTemplate.send(topic, message);
    }

    // 使用ProducerBuilderCustomizer自定义生产者参数
    public MessageId sendMessageWithProducerCustomizer(String topic, byte[] message, int timeout, TimeUnit unit, BatcherBuilder builder) throws PulsarClientException {
        return pulsarTemplate.newMessage(message)
               .withTopic(topic)
               .withProducerCustomizer(pc -> {
                    pc.batcherBuilder(builder)
                           .sendTimeout(timeout, unit)
                           .enableBatching(true)
                           .batchingMaxMessages(10);
                })
               .send();
    }

    // 指定消息Key
    public <T> MessageId sendMessageWithKey(String topic, T message, String key) throws PulsarClientException {
        if (StringUtils.isEmpty(key)) {
            return sendMessage(topic, (byte[]) message);
        } else {
            return pulsarTemplate.newMessage((byte[]) message)
                   .withTopic(topic)
                   .withMessageCustomizer(messageBuilder -> {
                        messageBuilder.key(key);
                    })
                   .send();
        }
    }

    // 指定压缩类型
    public <T> MessageId sendMessage(String topic, T message, CompressionType compressionType) throws PulsarClientException {
        if (compressionType == null || compressionType == CompressionType.NONE) {
            return sendMessage(topic, (byte[]) message);
        } else {
            return pulsarTemplate.newMessage((byte[]) message)
                   .withTopic(topic)
                   .withProducerCustomizer(producerBuilder -> {
                        producerBuilder.compressionType(compressionType);
                    })
                   .send();
        }
    }

    // 发送包含自定义Header的消息
    public MessageId sendMessageWithHeaders(String topic, byte[] message, MessageHeaders headers) throws PulsarClientException {
        Message<byte[]> pulsarMessage = MessageBuilder.createMessage(message, headers);
        return pulsarTemplate.send(topic, pulsarMessage);
    }

    // 异步发送消息
    public CompletableFuture<MessageId> sendMessageAsync(String topic, byte[] message) {
        return pulsarTemplate.sendAsync(topic, message);
    }

    // 发送消息时添加拦截器
    @Bean
    public ProducerInterceptor<byte[]> messageInterceptor() {
        return new ProducerInterceptor<byte[]>() {
            @Override
            public Message<byte[]> beforeSend(String topic, Message<byte[]> message) {
                // 可以在此处对消息进行预处理,例如添加时间戳等
                MessageHeaders headers = message.getHeaders();
                Message<byte[]> newMessage = MessageBuilder.createMessage(message.getPayload(), headers);
                return newMessage;
            }
            @Override
            public void onSendFailure(String topic, Message<byte[]> message, Throwable exception) {
                // 处理发送失败的情况,例如记录日志
                System.err.println("Message send failed: " + exception.getMessage());
            }
            @Override
            public void onSendSuccess(String topic, Message<byte[]> message, MessageId messageId) {
                // 处理发送成功的情况,例如记录日志
                System.out.println("Message sent successfully: " + messageId);
            }
        };
    }
}

3.3 配置消费者(订阅主题与重试策略)

在 Spring Boot 中,我们可以使用 @PulsarListener 注解来配置 Pulsar 消费者,同时可以在配置文件中设置一些消费者的相关参数。以下是配置消费者的示例:

application.yml 中,添加如下配置:

spring:
  pulsar:
    consumer:
      topic: persistent://public/default/my-topic # 订阅的主题
      subscriptionName: my-subscription # 订阅名称
      subscriptionType: Exclusive # 订阅模式
      negativeAckRedeliveryBackoff:
        maxDelayMs: 60000 # 最大重试延迟时间
        minDelayMs: 1000 # 最小重试延迟时间
        multiplier: 2.0 # 重试延迟乘数
      deadLetterPolicy:
        maxRedeliverCount: 3 # 最大重试次数
        deadLetterTopic: persistent://public/default/my-dead-letter-topic # 死信队列主题

编写消费者服务类:

import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class PulsarConsumerService {
    @Value("${spring.pulsar.consumer.topic}")
    private String topic;
    @Value("${spring.pulsar.consumer.subscriptionName}")
    private String subscriptionName;
    @Value("${spring.pulsar.consumer.subscriptionType}")
    private SubscriptionType subscriptionType;
    @Value("${spring.pulsar.consumer.negativeAckRedeliveryBackoff.maxDelayMs}")
    private long maxDelayMs;
    @Value("${spring.pulsar.consumer.negativeAckRedeliveryBackoff.minDelayMs}")
    private long minDelayMs;
    @Value("${spring.pulsar.consumer.negativeAckRedeliveryBackoff.multiplier}")
    private double multiplier;
    @Value("${spring.pulsar.consumer.deadLetterPolicy.maxRedeliverCount}")
    private int maxRedeliverCount;
    @Value("${spring.pulsar.consumer.deadLetterPolicy.deadLetterTopic}")
    private String deadLetterTopic;

    @PulsarListener(topic = "${spring.pulsar.consumer.topic}", subscriptionName = "${spring.pulsar.consumer.subscriptionName}", subscriptionType = "${spring.pulsar.consumer.subscriptionType}")
    public void receiveMessage(@Payload String message) {
        System.out.println("Received message: " + message);
        // 处理消息的业务逻辑
    }

    @Bean
    public NegativeAckRedeliveryBackoff negativeAckRedeliveryBackoff() {
        return MultiplierRedeliveryBackoff.builder()
               .maxDelayMs(maxDelayMs)
               .minDelayMs(minDelayMs)
               .multiplier(multiplier)
               .build();
    }

    @Bean
    public DeadLetterPolicy deadLetterPolicy() {
        return DeadLetterPolicy.builder()
               .maxRedeliverCount(maxRedeliverCount)
               .deadLetterTopic(deadLetterTopic)
               .build();
    }
}

4. 代码实战

4.1 编写生产者代码(发送消息)

使用 PulsarTemplate 发送消息,支持同步发送、异步发送、带 Schema 的消息发送,示例如下:

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;

@Service
public class PulsarProducerService {
    @Autowired
    private PulsarTemplate<byte[]> pulsarTemplate;

    // 同步消息发送
    public MessageId sendMessage(String topic, byte[] message) throws Exception {
        return pulsarTemplate.send(topic, message);
    }

    // 发送包含Schema的消息
    public MessageId sendMessageWithSchema(String topic, GenericRecord message) throws Exception {
        return pulsarTemplate.send(topic, message, Schema.AVRO(GenericRecord.class));
    }

    // 指定消息Key
    public <T> MessageId sendMessageWithKey(String topic, T message, String key) throws Exception {
        return pulsarTemplate.newMessage((byte[]) message)
               .withTopic(topic)
               .withMessageCustomizer(messageBuilder -> {
                    messageBuilder.key(key);
                })
               .send();
    }

    // 指定压缩类型
    public <T> MessageId sendMessage(String topic, T message, CompressionType compressionType) throws Exception {
        return pulsarTemplate.newMessage((byte[]) message)
               .withTopic(topic)
               .withProducerCustomizer(producerBuilder -> {
                    producerBuilder.compressionType(compressionType);
                })
               .send();
    }

    // 发送包含自定义Header的消息
    public MessageId sendMessageWithHeaders(String topic, byte[] message, MessageHeaders headers) throws Exception {
        Message<byte[]> pulsarMessage = MessageBuilder.createMessage(message, headers);
        return pulsarTemplate.send(topic, pulsarMessage);
    }

    // 异步发送消息
    public CompletableFuture<MessageId> sendMessageAsync(String topic, byte[] message) {
        return pulsarTemplate.sendAsync(topic, message);
    }
}

4.2 编写消费者代码(接收消息)

通过 @PulsarListener 注解监听主题,支持接收普通消息、结构化消息,示例如下:

import org.apache.pulsar.client.api.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class PulsarConsumerService {
    @PulsarListener(topic = "my-topic", subscriptionName = "my-subscription", subscriptionType = SubscriptionType.Exclusive)
    public void receiveMessage(@Payload String message) {
        System.out.println("Received message: " + message);
        // 处理消息的业务逻辑
    }

    // 处理消息对象
    @PulsarListener(topic = "my-topic", subscriptionName = "my-subscription", subscriptionType = SubscriptionType.Exclusive)
    public void receiveObjectMessage(@Payload MyMessageObject message) {
        System.out.println("Received object message: " + message);
        // 处理消息对象的业务逻辑
    }

    // 处理带自定义Header的消息
    @PulsarListener(topic = "my-topic", subscriptionName = "my-subscription", subscriptionType = SubscriptionType.Exclusive)
    public void receiveMessageWithHeaders(Message<String> message) {
        MessageHeaders headers = message.getHeaders();
        String payload = message.getPayload();
        System.out.println("Received message with headers: " + payload);
        System.out.println("Headers: " + headers);
        // 根据Header进行不同的业务逻辑处理
    }

    // 处理异常情况
    @PulsarListener(topic = "my-topic", subscriptionName = "my-subscription", subscriptionType = SubscriptionType.Exclusive)
    public void receiveMessageWithExceptionHandling(@Payload String message) {
        try {
            System.out.println("Received message: " + message);
            // 处理消息的业务逻辑
            if (message.contains("error")) {
                throw new RuntimeException("Message contains error");
            }
            // 处理成功
        } catch (Exception e) {
            // 处理异常,例如记录日志
            System.err.println("Error processing message: " + e.getMessage());
            // 根据配置进行重试或发送到死信队列
        }
    }
}

5. 高级特性

5.1 Schema 管理(结构化消息处理)

Pulsar 支持多种 Schema 类型(JSON、Avro、Protobuf、KeyValue 等),无需手动序列化 / 反序列化,示例如下:

以 JSON Schema 为例,假设我们有一个 User 类:

@Data
public class User {
    private String id;
    private String name;
    private int age;
}

注册自定义 Schema:

import org.apache.pulsar.client.api.Schema;
import org.springframework.context.annotation.Bean;
import org.springframework.pulsar.config.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.config.DefaultSchemaResolver;

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
    return (schemaResolver) -> {
        schemaResolver.addCustomSchemaMapping(User.class, Schema.JSON(User.class));
    };
}

在生产者中使用 Schema 发送消息:

@Service
public class PulsarProducerService {
    @Autowired
    private PulsarTemplate<User> pulsarTemplate;

    public MessageId sendUserMessage(String topic, User user) throws Exception {
        return pulsarTemplate.send(topic, user, Schema.JSON(User.class));
    }
}

在消费者中使用 Schema 接收消息:

@Component
public class PulsarConsumerService {
    @PulsarListener(topic = "user-topic", subscriptionName = "user-subscription", schemaType = SchemaType.JSON)
    public void receiveUserMessage(@Payload User user) {
        System.out.println("Received user message: " + user);
        // 处理用户消息的业务逻辑
    }
}

5.2 事务支持

在一些业务场景中,需要确保多条消息的发送要么全部成功,要么全部失败。Pulsar 提供了强大的事务支持。首先配置 PulsarTransactionManager

@Bean
public PulsarTransactionManager pulsarTransactionManager(PulsarClient pulsarClient) {
    return new PulsarTransactionManager(pulsarClient);
}

在服务类中使用事务发送消息:

@Service
public class TransactionalService {
    @Autowired
    private PulsarTemplate<String> orderProducer;
    @Autowired
    private PulsarTemplate<String> paymentProducer;

    @Transactional
    public void processTransaction(String orderId, String paymentId) throws Exception {
        // 在同一个事务内发送两条消息
        orderProducer.send("order-topic", "Order: " + orderId);
        // 模拟业务逻辑,如果支付ID为空则回滚
        if (paymentId == null) {
            throw new RuntimeException("Payment ID is null, transaction will rollback.");
        }
        paymentProducer.send("payment-topic", "Payment: " + paymentId);
    }
}

5.3 消息路由策略

Pulsar 允许自定义消息路由策略,通过实现 MessageRouter 接口来控制消息的路由。例如,根据订单 ID 将消息发送到特定分区:

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TopicMetadata;

public class OrderIdRouter implements MessageRouter {
    @Override
    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
        String orderId = (String) msg.getKey();
        int partitionCount = metadata.numPartitions();
        // 根据订单ID的哈希值选择分区,确保同一订单的消息发送到相同分区
        return Math.abs(orderId.hashCode()) % partitionCount;
    }
}

在生产者中使用自定义路由策略:

@Service
public class OrderProducerService {
    @Autowired
    private PulsarClient pulsarClient;

    @Bean
    public Producer<String> orderProducer() throws Exception {
        return pulsarClient.newProducer(Schema.STRING)
               .topic("order-topic")
               .messageRouter(new OrderIdRouter())
               .create();
    }

    public MessageId sendOrderMessage(String orderId, String orderMessage) throws Exception {
        Producer<String> producer = orderProducer();
        return producer.newMessage()
               .key(orderId)
               .value(orderMessage)
               .send();
    }
}

本文小结

通过以上步骤,我们基于 SpringBoot 成功构建了 Apache Pulsar 实时应用。从环境准备、核心配置到代码实战,再到高级特性的应用,全面地展示了如何利用 Spring Boot 和 Apache Pulsar 的云原生架构优势,打造一个高效、可靠的实时数据处理系统。如果你正在为传统消息中间件的延迟、扩容或成本问题寻找解决方案,不妨尝试本文介绍的 SpringBoot 集成 Pulsar 方案。




上一篇:美团HoMer:基于全景序列与集合式预测的统一推荐架构,CTR提升1.99%
下一篇:限价订单簿状态变化详解:从层次结构到多场景实战示例
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-11 02:47 , Processed in 0.082481 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表