本文基于 Redis 8.6.0 与 redis-py 7.2.1,探索 RedisTimeSeries 模块的数据模型、核心操作、性能特性,并通过压测数据探查其性能表现。
1. 为什么选择 Redis TimeSeries?
RedisTimeSeries 是 Redis 内置的官方时间序列模块,它更适合作为 小型项目/轻量场景 的选择。当你已经在使用 Redis,希望在不引入额外时序中间件的前提下完成指标采集、短期留存与简单聚合分析时,它能以较低的工程成本快速落地。
需要明确的是:它并不一定在 健壮性、可观测性与极限性能 方面优于专业的时序数据库(如 Prometheus/InfluxDB/TimescaleDB 等)。在数据规模、查询复杂度、长期留存与运维治理要求更高的场景下,仍应结合业务目标做综合的选型评估。
它的核心优势包括:
- 原生时序数据结构:专为时间序列优化的存储格式,读写高效。
- 实时聚合能力:支持基于时间窗口的聚合、降采样等实时计算。
- 多维标签过滤:类似 Prometheus 的标签系统,支持灵活的多维度查询。
- Redis生态集成:无需额外部署中间件,与现有 Redis 集群无缝集成。
- 低引入成本:显著减少了系统组件数量与部署运维复杂度,但选用前仍需仔细评估数据容量与查询特征。
2. 环境准备与连接
2.1 测试环境
- Redis 版本: 8.6(开启 ACL 功能);
- 客户端: Python 3.8+ + redis-py 7.2.1;
- 测试目标:
192.168.37.37:6379;
- 认证方式: ACL 用户名/密码认证。
2.2 连接方式
Redis CLI 连接:
redis-cli -h 192.168.37.37 -p 6379 --user admin -a ‘你的密码’
Python 客户端初始化(推荐使用环境变量管理敏感信息):
import os
import redis
# 从环境变量读取配置,避免硬编码
client = redis.Redis(
host=os.getenv(“REDIS_HOST”, “192.168.37.37”),
port=int(os.getenv(“REDIS_PORT”, “6379”)),
username=os.getenv(“REDIS_USERNAME”, “admin”),
password=os.getenv(“REDIS_PASSWORD”, “”),
decode_responses=True, # 自动解码响应为字符串
)
client.ping() # 测试连接
ts = client.ts() # 获取 TimeSeries 客户端
提示: 生产环境建议使用 .env 文件或配置中心管理连接信息,避免密码硬编码泄露。
3. RedisTimeSeries数据模型详解
理解其数据模型是高效使用它的关键。每个时间序列由以下几个核心要素构成:
3.1 Key(键) - 序列的唯一标识
Key 是 Redis 中的标准键,用于唯一标识一条时间序列。良好的命名规范有助于管理和维护:
# 推荐命名模式:{业务}:{指标}:{维度}
monitor:cpu:app01 # 应用 CPU 使用率
monitor:latency_ms:p99 # P99 延迟指标
iot:sensor:temperature:001 # 物联网温度传感器
3.2 Timestamp(时间戳) - 数据点的时间维度
时间戳使用毫秒精度(Unix epoch 毫秒),支持精确的时间窗口查询:
# 显式指定时间戳
TS.ADD monitor:cpu:app01 1772242731127 63.5
# 使用 * 自动采用当前服务器时间
TS.ADD monitor:cpu:app01 * 63.5
3.3 Value(值) - 实际的测量值
支持整数和浮点数,适用于各种监控指标:
63.5 - CPU 使用率百分比
24.3 - 温度摄氏度
150 - 请求延迟毫秒
1024 - 内存使用量 MB
3.4 Labels(标签) - 多维度的元数据
标签是序列级别的元数据,用于多维度过滤和聚合查询:
TS.CREATE monitor:cpu:app01 LABELS metric cpu host app01 region cn-north
标签使用要点:
- 标签是键值对(
label_key=label_value);
- 支持多维度组合查询:
TS.MGET FILTER metric=cpu region=cn-north;
- 标签基数影响性能:避免创建过多具有唯一值的标签(如
user_id=12345)。
3.5 Retention(保留策略) - 数据生命周期管理
控制数据在内存中的保留时间,以平衡存储成本与查询需求:
# 保留 7 天数据(604800000 毫秒)
TS.CREATE monitor:cpu:app01 RETENTION 604800000
3.6 Duplicate Policy(重复策略) - 处理时间戳冲突
当向同一时间戳写入多次时,定义如何处理冲突:
# 常用策略:LAST(保留最后一次写入的值)
TS.CREATE monitor:cpu:app01 DUPLICATE_POLICY LAST
| 策略 |
说明 |
适用场景 |
BLOCK |
拒绝重复写入(默认) |
严格时序,不允许修正历史数据 |
LAST |
保留最后一次写入 |
监控数据修正场景 |
FIRST |
保留第一次写入 |
首次写入即正确的场景 |
MIN |
保留最小值 |
取最优值场景(如最低延迟) |
MAX |
保留最大值 |
峰值记录场景 |
3.7 Counter(计数器) - 累加型时序
特殊的时间序列类型,适用于计数场景,自动处理递增操作:
# 每次调用使计数器增加 1
TS.INCRBY monitor:requests:count 1
4. 核心操作实战
4.1 创建时间序列
命令行示例:
TS.CREATE monitor:cpu:app01 \
RETENTION 604800000 \
DUPLICATE_POLICY LAST \
LABELS metric cpu host app01 region cn-north unit percent
Python 代码示例:
def create_series(ts, prefix: str, run_id: str, series_count: int) -> list[str]:
“”“创建多条时间序列”“”
keys = []
for idx in range(series_count):
key = f”{prefix}:cpu:host{idx:04d}”
keys.append(key)
try:
ts.create(
key,
labels={
“metric”: “cpu”,
“region”: “cn-north”,
“host”: f”host{idx:04d}”,
“run_id”: run_id,
},
duplicate_policy=“last”,
)
except ResponseError as ex:
if “already exists” not in str(ex).lower():
raise
return keys
4.2 写入数据点
批量写入优化:使用 Pipeline 提升写入性能
def insert_worker(client, keys, points_per_series, batch_size, start_ms):
“”“并发写入工作线程”“”
pipe = client.pipeline(transaction=False)
pending = 0
for point_idx in range(points_per_series):
ts_ms = start_ms + point_idx * 1000
for key_idx, key in enumerate(keys):
value = float((point_idx % 100) + (key_idx % 20))
pipe.execute_command(“TS.ADD”, key, ts_ms, value)
pending += 1
if pending >= batch_size:
pipe.execute() # 批量提交
pending = 0
if pending > 0:
pipe.execute()
4.3 查询操作
4.3.1 单序列查询
# 获取序列最新一个数据点
TS.GET monitor:cpu:app01
# 查询时间范围内的所有数据点
TS.RANGE monitor:cpu:app01 - + # - 表示最小时间,+ 表示最大时间
# 倒序查询最近10个点
TS.REVRANGE monitor:cpu:app01 - + COUNT 10
# 获取序列元信息(标签、保留策略等)
TS.INFO monitor:cpu:app01
4.3.2 多序列查询(标签过滤)
# 查找匹配标签的所有序列 Key
TS.QUERYINDEX metric=cpu region=cn-north
# 获取各匹配序列的最新一个数据点(携带标签)
TS.MGET WITHLABELS FILTER metric=cpu region=cn-north
# 获取时间窗口内所有匹配序列的数据点(携带标签)
TS.MRANGE - + WITHLABELS FILTER metric=cpu region=cn-north
Python 实现示例:
# MGET:获取所有匹配序列的最新点
latest = ts.mget(
filters=[“metric=cpu”, “region=cn-north”, f”run_id={run_id}”],
with_labels=True
)
# MRANGE:获取时间窗口内的聚合数据
result = ts.mrange(
from_time=now_ms - 90_000,
to_time=now_ms,
filters=[“metric=cpu”, “region=cn-north”, f”run_id={run_id}”],
aggregation_type=“avg”,
bucket_size_msec=60_000,
with_labels=True
)
4.4 聚合与降采样
4.4.1 实时聚合查询
# 按1分钟时间桶计算平均值
TS.RANGE monitor:cpu:app01 - + AGGREGATION avg 60000
# 多序列聚合查询
TS.MRANGE - + AGGREGATION avg 60000 FILTER metric=cpu region=cn-north
4.4.2 持久化聚合规则(降采样)
# 创建原始序列和用于存储聚合结果的序列
TS.CREATE monitor:latency:raw
TS.CREATE monitor:latency:avg_1m
# 创建聚合规则:将原始数据实时聚合为1分钟平均值,写入新序列
TS.CREATERULE monitor:latency:raw monitor:latency:avg_1m AGGREGATION avg 60000
应用场景:
- 高频原始数据保留短期(如24小时),用于精细问题排查;
- 降采样后的数据保留长期(如30天),用于历史趋势分析;
- 有效减少存储空间占用,同时保持宏观分析能力。
5. 性能压测与结果分析
为了验证 RedisTimeSeries 在实际生产环境中的表现,我们设计了一个简单的压测方案,模拟了不同并发度、数据量和批量大小下的操作。
5.1 压测设计
压测模拟了真实的监控数据上报与查询场景:
- 时序数量:100 - 300条(模拟中小规模应用监控)
- 批量大小:2000 - 5000点/批(优化网络往返开销)
- 并发写入线程:8个
- 并发查询线程:8个
- 数据点:每条时序写入1000个点
5.2 压测结果
以下是基于 Redis 8.6.0(网络延迟 < 1ms)的实际压测数据:
| 时序数 |
批量大小 |
写入线程 |
查询线程 |
点/时序 |
总点数 |
写入时间(s) |
写入速率(点/秒) |
RANGE P50(ms) |
RANGE P95(ms) |
MGET P50(ms) |
MGET P95(ms) |
MRANGE P50(ms) |
MRANGE P95(ms) |
| 100 |
2000 |
8 |
8 |
1000 |
100,000 |
3.739 |
26,747.04 |
45.859 |
76.591 |
37.392 |
72.839 |
32.946 |
67.401 |
| 100 |
5000 |
8 |
8 |
1000 |
100,000 |
3.657 |
27,344.97 |
46.779 |
72.093 |
36.832 |
67.324 |
38.364 |
68.605 |
| 300 |
2000 |
8 |
8 |
1000 |
300,000 |
12.048 |
24,900.34 |
84.240 |
104.805 |
83.524 |
107.299 |
84.079 |
103.398 |
| 300 |
5000 |
8 |
8 |
1000 |
300,000 |
11.795 |
25,433.93 |
87.905 |
112.001 |
86.361 |
119.796 |
80.503 |
110.167 |
5.3 性能分析
5.3.1 写入性能
- 吞吐量: 稳定在 24,900 - 27,345 点/秒。
- 批量优化: 批量大小从 2000 增加到 5000,写入性能约有 2-4% 的提升。
- 扩展性: 时序数量从 100 条增加到 300 条(3倍),总吞吐量仅下降约 7%,线性扩展表现良好。
5.3.2 查询性能
- 单序列范围查询 (RANGE): P50 延迟 45-88ms,P95 延迟 72-112ms。
- 多序列最新点查询 (MGET): P50 延迟 37-86ms,P95 延迟 67-120ms。
- 多序列聚合查询 (MRANGE): P50 延迟 33-85ms,P95 延迟 67-110ms。
5.3.3 关键发现
- 批量写入能显著提升性能:使用 Pipeline 且每批 5000 个点相比 2000 个点有稳定的性能增益。
- 查询延迟与数据量正相关:更多的时序数会导致查询延迟增加,但在测试规模内仍处于可接受范围。
- 并发处理能力稳定:在 8 个并发线程同时进行读写操作时,性能表现平稳,未见剧烈抖动。
- 内存使用较为高效:存储约 30 万个数据点(300时序 * 1000点),内存占用约在 150MB 左右。
5.4 压测代码示例
def benchmark_concurrent_insert(client, keys, points_per_series, batch_size, writer_threads):
“”“并发写入压测函数”“”
key_groups = split_evenly(keys, writer_threads)
start_ms = int(time.time() * 1000)
t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=len(key_groups)) as pool:
futures = [
pool.submit(insert_worker, client, group, points_per_series, batch_size, start_ms)
for group in key_groups
]
written_points = sum(f.result() for f in futures)
elapsed = time.perf_counter() - t0
points_per_sec = written_points / elapsed if elapsed > 0 else 0.0
return elapsed, points_per_sec, written_points
6. 实践建议与注意事项
6.1 命名规范
# 推荐:{业务}:{指标}:{维度}:{实例}
monitor:cpu:host:app01
monitor:memory:container:service-a
iot:temperature:sensor:room-101
# 避免:过于简单或复杂的命名
cpu # 太简单,易产生键冲突
monitor:production:us-east-1:app:v2:api:latency:p99 # 太复杂,不易管理
6.2 标签设计原则
- 严格控制标签基数:避免使用像
user_id=12345 这类可能产生海量唯一值的高基数标签,这会影响过滤查询的性能。
- 使用稳定的通用维度:如
region, env, service, instance, cluster 等。
- 预定义标签值枚举:尽量提前确定标签的可能取值,避免运行时动态生成。
6.3 性能优化技巧
- 坚持批量写入:使用客户端 Pipeline,批量大小建议在 1000 - 5000 点之间。
- 合理设置数据保留时间 (Retention):根据业务实际查询需求设置,避免不必要的内存占用。
- 善用聚合规则 (Rule):对高频写入的原始数据创建降采样规则,节省存储空间并加速历史范围查询。
- 管理好连接池:复用 Redis 连接,避免频繁创建和销毁连接带来的开销。
希望以上关于 RedisTimeSeries 的解析、实战和性能数据能为你提供有价值的参考。在实际项目中,不妨结合你的具体场景进行测试。如果你有更多使用心得或疑问,欢迎来 云栈社区 与大家交流探讨。