找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1132

积分

0

好友

164

主题
发表于 4 天前 | 查看: 26| 回复: 0

在多年基于SparkStreaming的大数据流处理开发实践中,除了Kafka,Redis是使用最为频繁的组件。目前生产环境中维护着多个Redis集群,其中最大的一个32节点集群,其Key数量已达40亿,峰值QPS更是高达2000万。

在流处理架构中,Redis主要扮演两种角色:

  1. 作为离线更新的维表数据存储,用于丰富流数据的维度信息。
  2. 作为应用实时状态数据的存储。

无论哪种场景,最终在SparkStreaming中都需要与Redis进行频繁的getset交互。假设RDD的批处理间隔为1分钟,那么该窗口内的数据必须在1分钟内完成计算才算“无延迟”。当出现计算延迟时,如果问题不在与Redis的交互上,通常增加计算核心、内存或提高并行度即可解决。

然而,在开发一个数据量达1亿/分钟的SparkStreaming应用时,我们发现计算延迟的根源很可能在于与Redis的交互耗费了过多时间。此时,单纯增加计算资源收效甚微,必须从应用层与Redis的交互方式上进行深度优化。

应用层性能优化策略

很多时候,我们对已部署的Redis服务本身进行参数调优的空间有限,重启集群更是可能影响生产。因此,优化重点自然落在了应用侧代码上。

应用与Redis服务的数据交互过程大致如下图所示:
应用与Redis交互示意图

1. 使用连接池

在Java生态中,Jedis是与Redis交互的常用客户端。如果每次操作都新建一个Jedis连接,意味着每次都要经历TCP三次握手建立连接,关闭时又要四次挥手,内网环境下虽快,但在海量数据请求面前,这种开销不可忽视。因此,保持长连接是必要的。

JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10);
poolConfig.setMinIdle(5);
poolConfig.setMaxIdle(8);
JedisPool jedisPool = new JedisPool(poolConfig, "localhost", 6379);
Jedis jedis = jedisPool.getResource();

Jedis连接池示意
调用close()方法时,会判断dataSource。如果为null则直接断开连接;若连接来自连接池,则会将Jedis实例归还给JedisPool
连接归还流程

2. 批处理 (Batch)

批处理是大数据领域的通用优化思路,其核心是通过减少通信次数来提升效率。

  • Kafka Producer:通过batch.size参数控制一次发送的消息条数,并结合linger.ms参数保证在数据量不足时的发送实时性。
    Kafka batch参数
    Kafka linger参数
  • Flume:配置文件中的batchSize参数同样用于控制sourcechannelsinkchannel取数据的批次大小。
    Flume batch参数

那么,我们能否将多个Redis的setget操作打包成一个批次提交呢?

3. Pipeline 管道技术

Jedis提供了Pipeline模式,允许一次性提交多个操作命令,然后通过手动执行sync()统一发送到Redis并获取结果。结合连接池,使用示例如下:

Jedis jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
Response<String> res1 = pipeline.get("aa");
Response<String> res2 = pipeline.get("bb");
// 此处也可以执行set、hmget等其他命令
pipeline.sync();
String s1 = res1.get();
String s2 = res2.get();

jedis.pipelined()创建管道到执行sync()发送命令期间,可以堆积多个不同的Redis命令。与直接使用Jedis返回String不同,Pipeline执行命令返回的是泛型为StringResponse对象。

Response数据回填机制
这与NIO中的Future概念相似。执行pipeline.get(key)后,命令并未立即发送,因此只能返回一个Response对象作为占位符。当执行sync()将请求真正发送到Redis后,返回的结果才会被回填到对应的Response对象中。
Pipeline数据回填示意
只有sync()之后,Response中才会有数据,可以通过调试直观看到这一过程。

调试验证
使用Docker启动一个四主四从的Redis Cluster(端口10001-10008)。
Redis Cluster启动
通过redis-cli(注意集群模式需加-c参数)设置两个Key。
Redis-cli设置Key
使用Jedis连接10001节点,并通过Pipeline获取Key。

Jedis jedis = new Jedis("121.91.xxx.xx", 10001);
jedis.auth("1qaz@WSX");
Pipeline pipeline = jedis.pipelined();
Response<String> res1 = pipeline.get("aa");
pipeline.sync();
String s1 = res1.get();

sync()处设置断点进行调试,可以看到执行前response.datanull,执行后其值被回填(‘1’的ASCII码为49)。
Debug断点查看Response
Debug查看回填后数据

在SparkStreaming开发中,通常将批次大小设置为256或512,即一次通过Pipeline提交256或512条命令。
Batch size选择
具体数值可根据返回Value的大小或请求类型进行评估。

然而,Pipeline的本质是与单个Redis实例建立一个通道进行批量通信,因此它仅适用于Jedis客户端。 要理解这一点,需要从Redis的常见架构说起。

Redis架构与Pipeline的适配挑战

Redis有单点、主从哨兵、集群等多种部署模式。在生产环境,尤其在大数据场景下,集群模式最为常见。通常所说的集群默认指Redis官方的Cluster模式,但在多年实践中,豌豆荚开源的Codis曾因其易用性备受青睐,可惜已停止维护。

为什么强调Pipeline仅针对Jedis?这需要从Redis的数据分布架构说起。Redis将所有数据划分为16384个槽位(slot),每个Key通过计算都会落在其中一个slot上。

单点模式

在单点模式下,所有16384个slot都在同一个节点上。使用Jedis直连该节点即可,Pipeline可以正常工作。

Jedis jedis = new Jedis("ip", 10001);
Pipeline pipeline = jedis.pipelined();
Response<String> res1 = pipeline.get("aa");
Response<String> res2 = pipeline.get("bb");
pipeline.sync();

记住关键点:Jedis客户端设计用于连接单个Redis服务实例。
单点模式示意

而在Cluster模式下,集群中的多个节点会均匀分摊这16384个slot。这就带来了问题:一个Key应该放在哪个slot?这个slot又在哪个节点上?这是集群模式使用Pipeline必须面对的问题。

Codis 架构

Codis架构分为Codis Proxy代理层Server存储层
Codis架构示意
Proxy层让整个Codis集群对外表现得像一个“大的单点Redis”,并实现了Redis协议,因此用户可以直接使用Jedis连接Proxy。底层的多个Redis实例(Server层)对用户透明,slot均匀分布在这些实例上,由Proxy负责计算Key的slot并路由到正确的Redis节点。用户无需关心细节,可以像使用单点Redis一样使用Jedis和Pipeline。

val poolConfig = new JedisPoolConfig
val poolx: JedisResourcePool = RoundRobinJedisPool.create()
  .curatorClient(redisHost, 30000)
  .zkProxyDir("/jodis/" + database)
  .poolConfig(poolConfig)
  .timeoutMs(60000)
  .build()
val jedis: Jedis = poolx.getResource
Redis Cluster 架构

与Codis类似,Redis Cluster也由多个实例组成,slot均匀分布。不同之处在于,Redis Cluster没有独立的Proxy层,存储层直接暴露给用户。如上所述,Jedis只能连接单个节点。
如果使用Jedis连接到Cluster中的某一个节点,那么它只能操作该节点所负责的slot上的Key。例如,Key aa可能位于10001节点,Key bb位于10003节点。用Jedis连接10001节点去获取bb,将会直接报错。
Jedis直连Cluster节点操作错误
因此,官方提供了JedisCluster客户端来操作整个集群。
JedisCluster示意

HashSet<HostAndPort> jedisClusterNodes = new HashSet<>();
jedisClusterNodes.add(new HostAndPort("121.91.xx.xxx", 10001));
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(-1);
config.setMaxIdle(-1);
config.setMaxWaitMillis(1000);
JedisCluster cluster = new JedisCluster(jedisClusterNodes, 1000, 1000, 10, "1qaz@WSX", config);
cluster.get("aa");
cluster.get("bb");

JedisCluster的工作原理:

  1. 连接管理JedisCluster在初始化时,其内部的connectionHandler会为集群中每个Redis节点创建一个JedisPool,并构建一个映射表,将16384个slot与对应的JedisPool关联起来。
    Slot与JedisPool映射
  2. Key路由:每次执行操作前,JedisCluster会通过JedisClusterCRC16.getSlot(key)计算Key所在的slot。
    计算Key的Slot
    然后调用getConnectionFromSlot(slot),根据slot从映射表中获取对应的JedisPool,进而拿到一个具体的Jedis连接来执行命令。
    根据Slot获取连接
    获取Slot对应的Pool

    完整路由流程

由于JedisCluster面向的是整个集群,并未向用户暴露底层单个Jedis对象,因此原生的JedisCluster不支持Pipeline操作。在处理每分钟上亿条数据的大数据场景时,与Redis Cluster的交互瓶颈成为了棘手的性能问题。

Redis Cluster的Pipeline替代方案与自定义实现

在从Codis迁移到Redis Cluster的初期,我们尝试了多种方案来缓解交互延迟:

  • Lettuce客户端:另一个流行的Redis客户端,支持异步操作,类似于Pipeline。

    RedisURI uri = RedisURI.builder()
            .withHost("47.102.xxx.xxx")
            .withPassword("1qaz@WSX".toCharArray())
            .withPort(10001)
            .build();
    RedisClusterClient client = RedisClusterClient.create(uri);
    StatefulRedisClusterConnection<String, String> connect = client.connect();
    RedisAdvancedClusterAsyncCommands<String, String> async = connect.async();
    async.set("key1", "v1");
    async.set("key2", "v2");
    async.flushCommands();
    Thread.sleep(1000 * 3);
    connect.close();
    client.shutdown();

    但经实测,其效率提升仍未达到理想状态。Redisson客户端测试结果也不甚理想。

  • 自定义JedisCluster Pipeline:最终,我们基于JedisCluster自行实现了一套Pipeline客户端,并在生产环境中取得了显著的性能提升。
    实现思路
    关键在于JedisCluster内部用于维护slot-节点映射的cache对象是protected修饰的,这意味着可以被继承访问。
    Protected的cache属性
    通过子类,我们可以获取到slot与JedisPool的映射关系(即slots数组)。
    获取slots映射
    基于此,我们建立了几层映射关系来实现针对每个Redis节点的Pipeline:

    1. Map<JedisPool, Jedis>:根据Key计算slot,找到对应的JedisPool,并从中取出(或创建)一个唯一的Jedis连接。确保一个Redis实例只有一个Jedis用于Pipeline。
    2. Map<Jedis, Pipeline>:确保每个Jedis实例只开启一个Pipeline,该实例上所有Key的操作都通过这个管道进行。
    3. Map<Pipeline, Integer>:记录每个Pipeline中累积的命令数量,当达到预设的批次阈值(如256)时,自动触发sync(),将命令批量发送至Redis。

    这一思路的核心是将发往不同Redis节点的命令,分别归集到与各节点对应的Pipeline中,最后统一提交。基于此理论,我们使用Java和Scala开发了适配SparkStreaming的JedisCluster Pipeline组件,有效解决了数据库/中间件交互瓶颈。

总结与展望

以上是笔者在多年大数据开发中积累的关于Redis使用的部分经验。除了连接池、批处理、Pipeline以及针对集群架构的客户端选型与自定义外,Redis的性能优化还有很多维度,例如Hash数据结构中Field与Value的合理设计、内存优化配置等。

近期,陪伴团队七年之久的Codis因无法通过升级修复安全漏洞而正式下线。尽管向Redis Cluster的迁移工作早已启动,此刻也不免感慨技术的迭代与变迁。性能优化之路永无止境,需要根据实际的业务场景与技术架构不断探索与实践。




上一篇:pytest结合allure-pytest实战:生成专业级自动化测试报告
下一篇:Agentic Search 技术解析:基于ReAct框架的智能检索实现原理
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-17 17:28 , Processed in 0.111921 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表