找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3655

积分

0

好友

511

主题
发表于 14 小时前 | 查看: 5| 回复: 0

本文基于 Redis 8.6.0 与 redis-py 7.2.1,探索 RedisTimeSeries 模块的数据模型、核心操作、性能特性,并通过压测数据探查其性能表现。

1. 为什么选择 Redis TimeSeries?

RedisTimeSeries 是 Redis 内置的官方时间序列模块,它更适合作为 小型项目/轻量场景 的选择。当你已经在使用 Redis,希望在不引入额外时序中间件的前提下完成指标采集、短期留存与简单聚合分析时,它能以较低的工程成本快速落地。

需要明确的是:它并不一定在 健壮性、可观测性与极限性能 方面优于专业的时序数据库(如 Prometheus/InfluxDB/TimescaleDB 等)。在数据规模、查询复杂度、长期留存与运维治理要求更高的场景下,仍应结合业务目标做综合的选型评估。

它的核心优势包括:

  • 原生时序数据结构:专为时间序列优化的存储格式,读写高效。
  • 实时聚合能力:支持基于时间窗口的聚合、降采样等实时计算。
  • 多维标签过滤:类似 Prometheus 的标签系统,支持灵活的多维度查询。
  • Redis生态集成:无需额外部署中间件,与现有 Redis 集群无缝集成。
  • 低引入成本:显著减少了系统组件数量与部署运维复杂度,但选用前仍需仔细评估数据容量与查询特征。

2. 环境准备与连接

2.1 测试环境

  • Redis 版本: 8.6(开启 ACL 功能);
  • 客户端: Python 3.8+ + redis-py 7.2.1;
  • 测试目标: 192.168.37.37:6379
  • 认证方式: ACL 用户名/密码认证。

2.2 连接方式

Redis CLI 连接

redis-cli -h 192.168.37.37 -p 6379 --user admin -a ‘你的密码’

Python 客户端初始化(推荐使用环境变量管理敏感信息):

import os
import redis

# 从环境变量读取配置,避免硬编码
client = redis.Redis(
    host=os.getenv(“REDIS_HOST”, “192.168.37.37”),
    port=int(os.getenv(“REDIS_PORT”, “6379”)),
    username=os.getenv(“REDIS_USERNAME”, “admin”),
    password=os.getenv(“REDIS_PASSWORD”, “”),
    decode_responses=True,  # 自动解码响应为字符串
)
client.ping()  # 测试连接
ts = client.ts()  # 获取 TimeSeries 客户端

提示: 生产环境建议使用 .env 文件或配置中心管理连接信息,避免密码硬编码泄露。

3. RedisTimeSeries数据模型详解

理解其数据模型是高效使用它的关键。每个时间序列由以下几个核心要素构成:

3.1 Key(键) - 序列的唯一标识

Key 是 Redis 中的标准键,用于唯一标识一条时间序列。良好的命名规范有助于管理和维护:

# 推荐命名模式:{业务}:{指标}:{维度}
monitor:cpu:app01          # 应用 CPU 使用率
monitor:latency_ms:p99     # P99 延迟指标
iot:sensor:temperature:001 # 物联网温度传感器

3.2 Timestamp(时间戳) - 数据点的时间维度

时间戳使用毫秒精度(Unix epoch 毫秒),支持精确的时间窗口查询:

# 显式指定时间戳
TS.ADD monitor:cpu:app01 1772242731127 63.5

# 使用 * 自动采用当前服务器时间
TS.ADD monitor:cpu:app01 * 63.5

3.3 Value(值) - 实际的测量值

支持整数和浮点数,适用于各种监控指标:

  • 63.5 - CPU 使用率百分比
  • 24.3 - 温度摄氏度
  • 150  - 请求延迟毫秒
  • 1024 - 内存使用量 MB

3.4 Labels(标签) - 多维度的元数据

标签是序列级别的元数据,用于多维度过滤和聚合查询:

TS.CREATE monitor:cpu:app01 LABELS metric cpu host app01 region cn-north

标签使用要点

  • 标签是键值对(label_key=label_value);
  • 支持多维度组合查询:TS.MGET FILTER metric=cpu region=cn-north
  • 标签基数影响性能:避免创建过多具有唯一值的标签(如 user_id=12345)。

3.5 Retention(保留策略) - 数据生命周期管理

控制数据在内存中的保留时间,以平衡存储成本与查询需求:

# 保留 7 天数据(604800000 毫秒)
TS.CREATE monitor:cpu:app01 RETENTION 604800000

3.6 Duplicate Policy(重复策略) - 处理时间戳冲突

当向同一时间戳写入多次时,定义如何处理冲突:

# 常用策略:LAST(保留最后一次写入的值)
TS.CREATE monitor:cpu:app01 DUPLICATE_POLICY LAST
策略 说明 适用场景
BLOCK 拒绝重复写入(默认) 严格时序,不允许修正历史数据
LAST 保留最后一次写入 监控数据修正场景
FIRST 保留第一次写入 首次写入即正确的场景
MIN 保留最小值 取最优值场景(如最低延迟)
MAX 保留最大值 峰值记录场景

3.7 Counter(计数器) - 累加型时序

特殊的时间序列类型,适用于计数场景,自动处理递增操作:

# 每次调用使计数器增加 1
TS.INCRBY monitor:requests:count 1

4. 核心操作实战

4.1 创建时间序列

命令行示例

TS.CREATE monitor:cpu:app01 \
  RETENTION 604800000 \
  DUPLICATE_POLICY LAST \
  LABELS metric cpu host app01 region cn-north unit percent

Python 代码示例

def create_series(ts, prefix: str, run_id: str, series_count: int) -> list[str]:
    “”“创建多条时间序列”“”
    keys = []
    for idx in range(series_count):
        key = f”{prefix}:cpu:host{idx:04d}”
        keys.append(key)
        try:
            ts.create(
                key,
                labels={
                    “metric”: “cpu”,
                    “region”: “cn-north”,
                    “host”: f”host{idx:04d}”,
                    “run_id”: run_id,
                },
                duplicate_policy=“last”,
            )
        except ResponseError as ex:
            if “already exists” not in str(ex).lower():
                raise
    return keys

4.2 写入数据点

批量写入优化:使用 Pipeline 提升写入性能

def insert_worker(client, keys, points_per_series, batch_size, start_ms):
    “”“并发写入工作线程”“”
    pipe = client.pipeline(transaction=False)
    pending = 0

    for point_idx in range(points_per_series):
        ts_ms = start_ms + point_idx * 1000
        for key_idx, key in enumerate(keys):
            value = float((point_idx % 100) + (key_idx % 20))
            pipe.execute_command(“TS.ADD”, key, ts_ms, value)
            pending += 1

        if pending >= batch_size:
            pipe.execute()  # 批量提交
            pending = 0

    if pending > 0:
        pipe.execute()

4.3 查询操作

4.3.1 单序列查询

# 获取序列最新一个数据点
TS.GET monitor:cpu:app01

# 查询时间范围内的所有数据点
TS.RANGE monitor:cpu:app01 - +  # - 表示最小时间,+ 表示最大时间

# 倒序查询最近10个点
TS.REVRANGE monitor:cpu:app01 - + COUNT 10

# 获取序列元信息(标签、保留策略等)
TS.INFO monitor:cpu:app01

4.3.2 多序列查询(标签过滤)

# 查找匹配标签的所有序列 Key
TS.QUERYINDEX metric=cpu region=cn-north

# 获取各匹配序列的最新一个数据点(携带标签)
TS.MGET WITHLABELS FILTER metric=cpu region=cn-north

# 获取时间窗口内所有匹配序列的数据点(携带标签)
TS.MRANGE - + WITHLABELS FILTER metric=cpu region=cn-north

Python 实现示例

# MGET:获取所有匹配序列的最新点
latest = ts.mget(
    filters=[“metric=cpu”, “region=cn-north”, f”run_id={run_id}”],
    with_labels=True
)

# MRANGE:获取时间窗口内的聚合数据
result = ts.mrange(
    from_time=now_ms - 90_000,
    to_time=now_ms,
    filters=[“metric=cpu”, “region=cn-north”, f”run_id={run_id}”],
    aggregation_type=“avg”,
    bucket_size_msec=60_000,
    with_labels=True
)

4.4 聚合与降采样

4.4.1 实时聚合查询

# 按1分钟时间桶计算平均值
TS.RANGE monitor:cpu:app01 - + AGGREGATION avg 60000

# 多序列聚合查询
TS.MRANGE - + AGGREGATION avg 60000 FILTER metric=cpu region=cn-north

4.4.2 持久化聚合规则(降采样)

# 创建原始序列和用于存储聚合结果的序列
TS.CREATE monitor:latency:raw
TS.CREATE monitor:latency:avg_1m

# 创建聚合规则:将原始数据实时聚合为1分钟平均值,写入新序列
TS.CREATERULE monitor:latency:raw monitor:latency:avg_1m AGGREGATION avg 60000

应用场景

  • 高频原始数据保留短期(如24小时),用于精细问题排查;
  • 降采样后的数据保留长期(如30天),用于历史趋势分析;
  • 有效减少存储空间占用,同时保持宏观分析能力。

5. 性能压测与结果分析

为了验证 RedisTimeSeries 在实际生产环境中的表现,我们设计了一个简单的压测方案,模拟了不同并发度、数据量和批量大小下的操作。

5.1 压测设计

压测模拟了真实的监控数据上报与查询场景:

  • 时序数量:100 - 300条(模拟中小规模应用监控)
  • 批量大小:2000 - 5000点/批(优化网络往返开销)
  • 并发写入线程:8个
  • 并发查询线程:8个
  • 数据点:每条时序写入1000个点

5.2 压测结果

以下是基于 Redis 8.6.0(网络延迟 < 1ms)的实际压测数据:

时序数 批量大小 写入线程 查询线程 点/时序 总点数 写入时间(s) 写入速率(点/秒) RANGE P50(ms) RANGE P95(ms) MGET P50(ms) MGET P95(ms) MRANGE P50(ms) MRANGE P95(ms)
100 2000 8 8 1000 100,000 3.739 26,747.04 45.859 76.591 37.392 72.839 32.946 67.401
100 5000 8 8 1000 100,000 3.657 27,344.97 46.779 72.093 36.832 67.324 38.364 68.605
300 2000 8 8 1000 300,000 12.048 24,900.34 84.240 104.805 83.524 107.299 84.079 103.398
300 5000 8 8 1000 300,000 11.795 25,433.93 87.905 112.001 86.361 119.796 80.503 110.167

5.3 性能分析

5.3.1 写入性能

  • 吞吐量: 稳定在 24,900 - 27,345 点/秒。
  • 批量优化: 批量大小从 2000 增加到 5000,写入性能约有 2-4% 的提升。
  • 扩展性: 时序数量从 100 条增加到 300 条(3倍),总吞吐量仅下降约 7%,线性扩展表现良好。

5.3.2 查询性能

  • 单序列范围查询 (RANGE): P50 延迟 45-88ms,P95 延迟 72-112ms。
  • 多序列最新点查询 (MGET): P50 延迟 37-86ms,P95 延迟 67-120ms。
  • 多序列聚合查询 (MRANGE): P50 延迟 33-85ms,P95 延迟 67-110ms。

5.3.3 关键发现

  1. 批量写入能显著提升性能:使用 Pipeline 且每批 5000 个点相比 2000 个点有稳定的性能增益。
  2. 查询延迟与数据量正相关:更多的时序数会导致查询延迟增加,但在测试规模内仍处于可接受范围。
  3. 并发处理能力稳定:在 8 个并发线程同时进行读写操作时,性能表现平稳,未见剧烈抖动。
  4. 内存使用较为高效:存储约 30 万个数据点(300时序 * 1000点),内存占用约在 150MB 左右。

5.4 压测代码示例

def benchmark_concurrent_insert(client, keys, points_per_series, batch_size, writer_threads):
    “”“并发写入压测函数”“”
    key_groups = split_evenly(keys, writer_threads)
    start_ms = int(time.time() * 1000)

    t0 = time.perf_counter()
    with ThreadPoolExecutor(max_workers=len(key_groups)) as pool:
        futures = [
            pool.submit(insert_worker, client, group, points_per_series, batch_size, start_ms)
            for group in key_groups
        ]
        written_points = sum(f.result() for f in futures)
    elapsed = time.perf_counter() - t0

    points_per_sec = written_points / elapsed if elapsed > 0 else 0.0
    return elapsed, points_per_sec, written_points

6. 实践建议与注意事项

6.1 命名规范

# 推荐:{业务}:{指标}:{维度}:{实例}
monitor:cpu:host:app01
monitor:memory:container:service-a
iot:temperature:sensor:room-101

# 避免:过于简单或复杂的命名
cpu  # 太简单,易产生键冲突
monitor:production:us-east-1:app:v2:api:latency:p99  # 太复杂,不易管理

6.2 标签设计原则

  • 严格控制标签基数:避免使用像 user_id=12345 这类可能产生海量唯一值的高基数标签,这会影响过滤查询的性能。
  • 使用稳定的通用维度:如 region, env, service, instance, cluster 等。
  • 预定义标签值枚举:尽量提前确定标签的可能取值,避免运行时动态生成。

6.3 性能优化技巧

  • 坚持批量写入:使用客户端 Pipeline,批量大小建议在 1000 - 5000 点之间。
  • 合理设置数据保留时间 (Retention):根据业务实际查询需求设置,避免不必要的内存占用。
  • 善用聚合规则 (Rule):对高频写入的原始数据创建降采样规则,节省存储空间并加速历史范围查询。
  • 管理好连接池:复用 Redis 连接,避免频繁创建和销毁连接带来的开销。

希望以上关于 RedisTimeSeries 的解析、实战和性能数据能为你提供有价值的参考。在实际项目中,不妨结合你的具体场景进行测试。如果你有更多使用心得或疑问,欢迎来 云栈社区 与大家交流探讨。




上一篇:剖析ABP框架AggregateRoot与FullAuditedEntity区别:.NET Core开发核心选择
下一篇:Nginx安全防护实战指南:从基础加固到WAF规则编写的全流程
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-1 20:21 , Processed in 0.399503 second(s), 43 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表