找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3409

积分

0

好友

449

主题
发表于 1 小时前 | 查看: 2| 回复: 0

一、故障现场:为什么一条慢 SQL 能演变成系统性事故

先看一个非常典型的真实场景模型。

凌晨 2:17,电商订单服务报警:

  • 下单接口 P99 从 180ms 飙到 11.8s
  • MySQL 活跃连接数逼近上限
  • 应用层线程池队列堆积
  • 网关 5xx 上升,订单成功率跌破 82%

故障根因最后定位为一条 SQL:

SELECT o.id, o.user_id, o.amount, u.level
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.status = 'PAYING'
  AND o.created_at >= NOW() - INTERVAL 1 DAY
ORDER BY o.created_at DESC
LIMIT 1000;

一条 3 秒的 SQL,不会只让数据库多忙 3 秒。它可能顺着连接池、线程池、缓存击穿、重试风暴一路放大,最终把一个看起来“只是有点慢”的问题演化成整条业务链路的雪崩事故。真正危险的,从来不是某一条慢 SQL,而是团队没有能力在分钟级发现它、定位它、量化它、治理它。

问题并不只是“它慢”,而是它在事故中被放大成了链式反应:

  1. 新版本上线后,某个分页查询漏掉了复合索引。
  2. SQL 扫描行数从几百行飙到几百万行,单次耗时从 40ms 上升到 3s。
  3. 应用连接池中的连接被长时间占用,请求开始排队。
  4. 上游服务设置了重试,导致数据库压力进一步放大。
  5. 热点 key 缓存失效,读流量回源数据库。
  6. 最终 MySQL 连接数打满,业务出现大面积超时。

这里最关键的结论是:

慢 SQL 并不是孤立问题,而是高并发系统中的放大器。

因此,一个真正有价值的慢日志平台,不应只回答“哪条 SQL 慢”,还要回答下面几个更关键的问题:

  • 它是哪个业务、哪个租户、哪个实例触发的?
  • 它是偶发抖动,还是同类 SQL 在 5 分钟窗口内持续恶化?
  • 它是锁等待问题、执行计划回退问题,还是扫描行数失控问题?
  • 它已经影响哪些上游应用指标?
  • 是否需要触发自动限流、熔断、工单、通知甚至 SQL Kill?

这就决定了平台的定位不能只是“日志收集”,而必须是一套可观测 + 流式处理 + 实时告警 + 治理闭环的工程系统。

很多团队都开了 MySQL 慢查询日志,但最后仍然在故障现场手忙脚乱:日志在不同节点、不同 Pod、不同实例里散落;文件大到打不开;分析依赖人工 greppt-query-digest;出了事只能靠 DBA 临时登录机器救火。这样的慢日志体系,本质上仍然是“事后取证工具”,而不是“实时治理平台”。

本文从架构师和工程落地的视角,完整升级这篇文章,目标不是“教你收集慢日志”,而是带你构建一套真正能在生产环境落地的、云原生的、分布式的 MySQL 慢日志采集分析平台:

  • 从 MySQL 慢日志原理讲透采集边界和性能影响
  • 从单实例文件分析升级到多集群、多租户、实时处理架构
  • 给出面向高并发的 Kafka + 流式消费 + 存储建模方案
  • 补全生产级代码、Kubernetes 配置、索引设计和告警闭环
  • 结合真实事故场景,说明平台如何从“可看见”走向“可止血”

如果你正在经历下面这些问题,这篇文章会非常贴近你的现场:

  • 应用 P99 飙升,但不知道是哪个库、哪条 SQL、哪个租户触发
  • 数据库实例多、分库分表多、跨可用区多,慢日志根本收不齐
  • 有监控,没有细粒度 SQL 指纹聚合,无法判断“同类 SQL 是否恶化”
  • 有日志,没有治理闭环,告警之后仍靠 DBA 手工分析

本文不会停留在“用 Filebeat 收一下日志”这种浅层方案,而是直接给出一套生产级设计。

二、先把底层吃透:MySQL 慢查询日志到底是怎么产生的

如果不了解慢日志的底层机制,平台设计很容易做错,比如阈值配得过低、采样策略不合理、字段不够、误以为所有性能问题都能从慢日志看到。

2.1 慢日志的判定时机

MySQL 并不是“执行前预判”某条 SQL 会不会慢,而是在语句执行完成后,根据实际执行统计决定是否写入慢日志。常见触发条件包括:

  • Query_time 超过 long_query_time
  • 开启 log_queries_not_using_indexes
  • 开启 log_slow_admin_statements
  • 某些增强发行版支持采样、过滤和限速

这意味着两件事:

  1. 慢日志是结果视角,不是过程视角。 它擅长告诉你“这条 SQL 最终花了多久”,但不擅长精细刻画执行中的每个阶段。
  2. 慢日志天然适合做稳定、低侵入的离线或准实时治理。 如果你要看更细粒度的等待事件、锁竞争、执行阶段耗时,还要结合 performance_schemaevents_statements_summary_by_digestINNODB_TRX、APM Trace 一起看。

2.2 关键参数与生产建议

下面这些参数,决定了你采集到的慢日志是否“既有价值,又不伤库”。

参数 建议值 说明
slow_query_log ON 打开慢日志
long_query_time 0.1 ~ 0.5 高频核心库通常从 0.2s0.3s 起步,必须压测评估
log_output FILE 生产首选,利于零侵入采集
log_queries_not_using_indexes 谨慎开启 容易导致日志洪峰
log_slow_admin_statements ON DDL、ANALYZE、OPTIMIZE 等管理语句也应关注
min_examined_row_limit 100 或更高 避免记录意义不大的小 SQL
log_timestamps UTC 跨区域采集时建议统一时区

更稳妥的生产经验是:

  • 核心交易库long_query_time 不要一上来就调到 0.01s,否则日志量会爆炸
  • 离线分析库:阈值可以更宽松,重点观察大扫描、大排序、大临时表
  • 高 QPS 场景:如果使用 Percona Server,优先启用慢日志采样和 rate limit

2.3 FILE、TABLE、Performance Schema 三种路径怎么选

很多文章一上来就说“慢日志写文件最方便”,这没错,但还不够。你需要知道三种能力分别适合什么场景。

方案一:log_output=FILE

优点:

  • 对业务最友好,外部采集器可直接追踪文件
  • 不侵入数据库元数据结构
  • 与容器 stdout、sidecar 采集天然兼容

缺点:

  • 本质是非结构化文本,需要解析
  • 字段有限,无法直接拿到完整执行计划

适合:

  • 绝大多数生产环境
  • 做日志平台的统一采集入口

方案二:log_output=TABLE

优点:

  • 可直接 SQL 查询
  • 对周期性统计场景更直接

缺点:

  • 写表意味着更高开销
  • 数据膨胀快,清理成本高
  • 在高写入时容易和业务资源竞争

适合:

  • 低流量环境的临时排查
  • 不建议作为大规模生产平台入口

方案三:performance_schema / digest summary

优点:

  • 有结构化聚合能力
  • 可直接看到 digest 级别统计
  • 适合做低粒度持续观测

缺点:

  • 不保留完整 SQL 文本明细
  • 更适合统计,不适合完整审计和长文本分析

适合:

  • 与慢日志平台配合使用
  • 做平台的“增强数据源”

结论很明确:

慢日志文件负责明细,Performance Schema 负责摘要,APM/Trace 负责上下文。三者结合,才是完整生产观测方案。

三、平台建设目标:不是“收上来”,而是“分钟级发现和治理”

在画架构图之前,先定义目标。没有目标的架构设计,最后一定演化成“大而全却难运营”的日志堆场。

这套平台的核心目标建议定成下面五个:

3.1 采集目标

  • 支持物理机、虚机、Kubernetes、云数据库代理节点等混合环境
  • 支持多实例、多租户、多集群统一接入
  • 单点故障不丢日志,至少做到“短时可回放”

3.2 分析目标

  • 在 1 分钟内完成从慢日志产生到平台可见
  • 能够基于 SQL 指纹做聚合,而不是只堆原始文本
  • 能区分“偶发单点慢”和“同类 SQL 系统性恶化”

3.3 治理目标

  • 支持阈值告警、同比/环比恶化告警
  • 能按业务、库、实例、租户、来源服务维度下钻
  • 具备与 Trace、CMDB、工单、通知系统联动能力

3.4 工程目标

  • 消息链路支持削峰填谷
  • 解析层可水平扩容
  • 存储冷热分层,避免 ES/ClickHouse 成为新瓶颈

3.5 运维目标

  • 平台自身可观测
  • 支持回放、补数、重算
  • 可以灰度上线和分阶段推广

你会发现,目标一旦明确,架构选择自然就清晰了:不能只有采集器和搜索引擎,还必须有消息总线、流式聚合层、标签体系和治理出口。

四、总体架构:从单机脚本升级为云原生分布式数据管道

下面给出一套生产中非常实用的分层架构。

                           ┌──────────────────────────────┐
                           │         运维治理层             │
                           │ Grafana / Kibana / Alerting   │
                           │ 工单 / 钉钉 / 自动限流 / Kill  │
                           └──────────────┬───────────────┘
                                          │
                           ┌──────────────▼───────────────┐
                           │        查询与服务层            │
                           │ Query API / 聚合 API / DSL    │
                           └──────────────┬───────────────┘
                                          │
                     ┌────────────────────┴────────────────────┐
                     │                                         │
           ┌──────────▼──────────┐                 ┌──────────▼──────────┐
           │      明细存储层      │                 │      聚合存储层      │
           │ Elasticsearch/OpenSearch │             │ ClickHouse / Doris   │
           └──────────┬──────────┘                 └──────────┬──────────┘
                     │                                         │
                     └────────────────────┬────────────────────┘
                                          │
                           ┌──────────────▼───────────────┐
                           │       流式处理与解析层         │
                           │ Go Consumer / Flink / Kafka  │
                           │ 解析 / 指纹 / 富化 / 聚合      │
                           └──────────────┬───────────────┘
                                          │
                           ┌──────────────▼───────────────┐
                           │          消息总线层            │
                           │ Kafka / Pulsar / Redpanda    │
                           └──────────────┬───────────────┘
                                          │
                ┌─────────────────────────┴─────────────────────────┐
                │                                                   │
      ┌─────────▼─────────┐                             ┌──────────▼──────────┐
      │     采集器层        │                             │      元数据层        │
      │ Filebeat / Fluent Bit │                           │ CMDB / K8s API      │
      │ DaemonSet / Sidecar   │                           │ 服务映射 / 租户映射  │
      └─────────┬─────────┘                             └─────────────────────┘
                │
      ┌─────────▼─────────┐
      │    MySQL 实例层    │
      │ 主从 / 分片 / 多集群 │
      └────────────────────┘

4.1 为什么要分层

很多团队一开始会尝试:

Filebeat -> Elasticsearch

这是能跑起来的,但当你进入中大规模场景时,问题会集中爆发:

  • ES 同时承担写入和实时计算,成本高、压力大
  • 没有缓冲层,流量峰值会直接打穿下游
  • 无法优雅地做重放、补数、规则重算
  • 很难扩展复杂分析,比如 digest 聚合、异常检测、租户维度聚类

因此,生产级设计通常演进为:

采集器 -> Kafka -> 解析/聚合 -> 明细与聚合双写 -> 服务/告警

4.2 组件职责划分

采集器层

负责:

  • 跟踪文件
  • 多行合并
  • 打基础标签
  • 将日志稳定推送到消息总线

不负责:

  • 重量级 SQL 解析
  • 聚合统计
  • 复杂告警逻辑

采集器只做“轻活”,这样才不会在节点侧引入过高资源成本。

消息总线层

核心价值不是“中转”,而是:

  • 削峰填谷
  • 解耦上下游扩缩容节奏
  • 保证短时堆积可恢复
  • 支持补消费和新消费者接入

解析与流处理层

这是整个平台最关键的智能核心,负责:

  • 解析慢日志文本
  • 生成 SQL 指纹
  • 富化业务标签
  • 聚合分钟级统计
  • 产出告警判定所需指标

存储层

不要只用一种存储。

  • 明细存储:用于排查和检索,适合 ES/OpenSearch
  • 聚合存储:用于高性价比统计和榜单分析,适合 ClickHouse/Doris

这是很多团队在一年后才意识到的关键分层:明细和聚合的访问模式完全不同,不应混在一起。

五、采集策略设计:DaemonSet、Sidecar、Agentless 怎么选

Kubernetes 场景中,采集策略决定了成本、复杂度和稳定性。

5.1 DaemonSet 模式

采集器以 DaemonSet 形式部署在每个节点,统一采集节点上的 MySQL 容器日志或宿主机挂载日志。

优点:

  • 运维成本低
  • 实例增减无需改 Pod 模板
  • 资源利用率高

缺点:

  • 需要日志路径和容器运行时行为稳定
  • 对一些自定义挂载路径不够灵活

适合:

  • MySQL 实例较多
  • 集群标准化程度高
  • 平台团队负责统一接入

5.2 Sidecar 模式

在每个 MySQL Pod 中放一个轻量采集容器,与主容器共享慢日志目录。

优点:

  • 与业务实例强绑定,隔离清晰
  • 易于针对单实例定制规则
  • 文件路径最直接

缺点:

  • Pod 资源成本更高
  • 大规模实例下 Sidecar 数量多

适合:

  • 数据库实例数量不大
  • 多团队多规格混用,接入标准不统一
  • 需要逐库灰度推广

5.3 Agentless 模式

通过数据库管理节点或共享存储统一拉取日志。

优点:

  • 改造最少

缺点:

  • 时效性差
  • 容易形成中心节点瓶颈
  • 不适合实时分析

结论很简单:

  • 云原生标准集群优先 DaemonSet
  • 强隔离或个性化场景优先 Sidecar
  • Agentless 只适合过渡期,不适合作为最终形态

六、核心设计点:高并发场景下,平台为什么还能稳住

文章写到这里,如果还只是“一个能用的架构”,那离生产级还有差距。真正的生产级升级,关键在于你是否提前考虑了高并发和可扩展性。

6.1 峰值流量怎么估算

先做一个非常实用的容量模型。

假设:

  • 200 个 MySQL 实例
  • 每实例高峰期每秒 80 条慢日志
  • 每条慢日志平均 2KB

则平台入口峰值约为:

  • 16,000 条/秒
  • 约 32MB/s 原始吞吐

如果开启 log_queries_not_using_indexes,或者某次故障导致同类 SQL 风暴式刷屏,峰值会轻松放大 3 到 10 倍。

所以你不能按“平时流量”设计,而要按“故障流量”设计。

6.2 Kafka 为什么是关键缓冲层

Kafka 在这里至少承担四个职责:

  • 平滑上游突增日志
  • 给下游解析器留出弹性扩容时间
  • 支持消费失败后重试
  • 为后续新增消费者保留历史数据窗口

生产建议:

  • Topic 按环境或业务域拆分,不要所有实例共用一个超大 Topic
  • 分区数按峰值和扩容上限预留,不要只按当前实例数配置
  • 保留策略至少 24 小时,关键环境可保留 72 小时
  • 使用压缩降低网络成本,snappylz4 通常更均衡

6.3 解析层必须无状态

解析服务不要把状态放在本地内存里依赖单实例持有,否则扩容、迁移、重启都会出问题。

正确设计是:

  • 单条消息解析无状态
  • 分钟级聚合状态放在流计算引擎或外部状态存储
  • 配置和规则中心化下发

这样解析层才能通过 Deployment + HPA 水平扩展。

6.4 明细与聚合双写是必须的

很多团队一开始只存明细,后面做榜单、趋势、聚类和 TopN 会非常痛苦。

推荐输出两类数据:

  1. 明细事件流: 每条慢日志一条记录,便于精确排查。
  2. 分钟聚合流:instance + db + sql_digest + minute_bucket 为主键,记录:
    • 次数
    • 平均耗时
    • P95/P99
    • 最大耗时
    • 平均扫描行数
    • 锁等待占比

这样大屏、报表、告警全都可以直接吃聚合层数据。

七、生产级数据模型:你要存的不只是 SQL 文本

一个做不下去的平台,往往不是采集链路先出问题,而是数据模型先把后续分析能力锁死了。

建议事件模型至少包含以下字段:

字段 类型 说明
event_time datetime 慢日志时间
cluster keyword 集群名
namespace keyword K8s 命名空间
instance keyword 实例地址或 Pod
db keyword 数据库名
user keyword 执行用户
client_ip ip 来源 IP
query_time_ms long 执行耗时毫秒
lock_time_ms long 锁等待毫秒
rows_sent long 返回行数
rows_examined long 扫描行数
tmp_tables integer 临时表数,可选
tmp_disk_tables integer 落盘临时表数,可选
sql_text text 原始 SQL
sql_digest keyword 指纹
sql_template text 模板化 SQL
trace_id keyword 应用注入的 trace id
service keyword 来源服务
tenant_id keyword 租户
env keyword 环境
tags object 扩展标签

几个特别重要的建模建议:

7.1 query_time 不要只存浮点秒

统一换算成毫秒整数,便于聚合和排序,减少精度歧义。

7.2 sql_textsql_digest 必须同时保留

  • sql_text 用于定位具体 SQL
  • sql_digest 用于聚合同类 SQL

只有全文,没有指纹,你无法做稳定统计。只有指纹,没有原文,排障体验又会很差。

7.3 标签体系决定平台上限

平台一旦缺少 servicetenant_idcluster 这类业务标签,最后就只能看到“某实例很慢”,却无法回答“到底是哪个业务导致的”。

日志平台做不出价值,很多时候不是技术不行,而是数据没有打通业务语义。

八、SQL 指纹设计:为什么不能只靠原始 SQL 去聚合

下面两条 SQL,本质上是一类问题:

SELECT * FROM orders WHERE user_id = 1001 AND status = 'PAYING';
SELECT * FROM orders WHERE user_id = 2009 AND status = 'PAYING';

如果你拿原始 SQL 聚合,它们会被当成两条完全不同的语句,平台看不到“这类查询在最近 5 分钟大量变慢”。

所以必须生成 SQL 指纹。

8.1 指纹的核心目标

  • 归一化字面值
  • 消除多余空白和注释
  • 保留 SQL 结构
  • 尽可能稳定地表示“同类语句”

8.2 常见归一化规则

  • 数字替换为 ?
  • 字符串替换为 ?
  • IN (...) 中的具体值归一化
  • 去掉注释
  • 统一大小写或关键字风格
  • 折叠多余空白

8.3 生产建议

如果你追求准确性,不要长期依赖纯正则方案,推荐三种方式:

  1. 使用 Percona Toolkit 的 fingerprint 规则
  2. 使用 MySQL 8.0 digest 能力作为辅助摘要
  3. 使用成熟 SQL Parser 做结构级归一化

正则做 demo 很快,但在复杂 SQL、嵌套子查询、批量插入、注释 Hint 场景下容易误判。

九、落地实现一:Filebeat 采集配置

下面给出一个更接近生产的 Sidecar 场景 Filebeat 配置。

filebeat.inputs:
  - type: filestream
    id: mysql-slowlog
    enabled: true
    paths:
      - /var/log/mysql/slow.log
    parsers:
      - multiline:
          type: pattern
          pattern: '^# Time:'
          negate: true
          match: after
          max_lines: 1000
          timeout: 3s
    fields:
      log_type: mysql_slow
      env: prod
      cluster: cn-hz-01
    fields_under_root: true
    prospector.scanner.check_interval: 5s
    close.on_state_change.inactive: 10m
    ignore_older: 24h

processors:
  - add_kubernetes_metadata:
      host: ${NODE_NAME}
      matchers:
        - logs_path:
            logs_path: "/var/log/"
  - add_host_metadata: ~

output.kafka:
  hosts:
    - kafka-0.kafka:9092
    - kafka-1.kafka:9092
    - kafka-2.kafka:9092
  topic: mysql-slowlog-raw
  codec.json:
    pretty: false
  partition.hash:
    hash: ["kubernetes.pod.name"]
  required_acks: 1
  compression: snappy
  max_message_bytes: 1048576
  channel_buffer_size: 2048

这里有几个比 demo 更重要的点:

  • 使用 filestream 替代旧 log input,更适合新版本 Filebeat
  • 多行合并以 # Time: 为边界,避免一条 SQL 被拆成多条
  • 加入基础标签,后续解析服务可直接继承
  • Kafka 分区策略不要完全随机,按实例或 Pod hash 更利于局部顺序

十、落地实现二:Kubernetes Sidecar 部署示例

如果你的 MySQL Pod 不方便把慢日志直接打到 stdout,Sidecar 是最稳妥的方案。

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: mysql-order
spec:
  serviceName: mysql-order
  replicas: 1
  selector:
    matchLabels:
      app: mysql-order
  template:
    metadata:
      labels:
        app: mysql-order
    spec:
      containers:
        - name: mysql
          image: mysql:8.0
          env:
            - name: MYSQL_ROOT_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: mysql-secret
                  key: root-password
          volumeMounts:
            - name: mysql-data
              mountPath: /var/lib/mysql
            - name: mysql-slowlog
              mountPath: /var/log/mysql
        - name: filebeat
          image: docker.elastic.co/beats/filebeat:8.12.2
          args: ["-c", "/etc/filebeat.yml", "-e"]
          env:
            - name: NODE_NAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
          volumeMounts:
            - name: mysql-slowlog
              mountPath: /var/log/mysql
            - name: filebeat-config
              mountPath: /etc/filebeat.yml
              subPath: filebeat.yml
      volumes:
        - name: mysql-slowlog
          emptyDir: {}
        - name: filebeat-config
          configMap:
            name: mysql-slowlog-filebeat
  volumeClaimTemplates:
    - metadata:
        name: mysql-data
      spec:
        accessModes: ["ReadWriteOnce"]
        resources:
          requests:
            storage: 100Gi

同时,MySQL 侧配置需要明确慢日志输出路径:

[mysqld]
slow_query_log=ON
slow_query_log_file=/var/log/mysql/slow.log
long_query_time=0.2
log_output=FILE
log_timestamps=UTC
min_examined_row_limit=100

十一、落地实现三:生产级 Go 解析服务

下面这段代码不是“玩具级正则 demo”,而是一个更接近生产思路的解析器框架。它体现的是几个关键设计:

  • Kafka Consumer Group 消费
  • Worker Pool 并发解析
  • SQL 指纹归一化
  • 批量写入下游
  • 失败消息隔离和可观测

11.1 数据结构定义

package slowlog

import "time"

type RawMessage struct {
    Value     string            `json:"value"`
    Source    string            `json:"source"`
    Cluster   string            `json:"cluster"`
    Namespace string            `json:"namespace"`
    Pod       string            `json:"pod"`
    Labels    map[string]string `json:"labels"`
}

type SlowQueryEvent struct {
    EventTime       time.Time         `json:"event_time"`
    Cluster         string            `json:"cluster"`
    Namespace       string            `json:"namespace"`
    Pod             string            `json:"pod"`
    Instance        string            `json:"instance"`
    DB              string            `json:"db"`
    User            string            `json:"user"`
    ClientIP        string            `json:"client_ip"`
    QueryTimeMS     int64             `json:"query_time_ms"`
    LockTimeMS      int64             `json:"lock_time_ms"`
    RowsSent        int64             `json:"rows_sent"`
    RowsExamined    int64             `json:"rows_examined"`
    SQLText         string            `json:"sql_text"`
    SQLDigest       string            `json:"sql_digest"`
    SQLTemplate     string            `json:"sql_template"`
    TraceID         string            `json:"trace_id"`
    Service         string            `json:"service"`
    TenantID        string            `json:"tenant_id"`
    Tags            map[string]string `json:"tags"`
    IngestTimestamp time.Time         `json:"ingest_timestamp"`
}

11.2 慢日志解析器

package slowlog

import (
    "regexp"
    "strconv"
    "strings"
    "time"
)

var (
    timePattern        = regexp.MustCompile(`(?m)^# Time: ([^\n]+)$`)
    userHostPattern    = regexp.MustCompile(`(?m)^# User@Host: ([^\[]+)\[[^\]]*\] @ .*?\[([^\]]*)\]`)
    metricsPattern     = regexp.MustCompile(`(?m)^# Query_time: ([0-9.]+)\s+Lock_time: ([0-9.]+)\s+Rows_sent: ([0-9]+)\s+Rows_examined: ([0-9]+)$`)
    useDBPattern       = regexp.MustCompile(`(?m)^use\s+([^\s;]+);$`)
    setTimestampPattern = regexp.MustCompile(`(?m)^SET timestamp=\d+;$`)
    tracePattern       = regexp.MustCompile(`/\*\s*trace_id:([a-zA-Z0-9\-_]+)\s*\*/`)
    numberPattern      = regexp.MustCompile(`\b\d+\b`)
    stringPattern      = regexp.MustCompile(`'[^']*'|"[^"]*"`)
    blankPattern       = regexp.MustCompile(`\s+`)
    commentPattern     = regexp.MustCompile(`/\*.*?\*/`)
)

func ParseSlowLog(raw RawMessage) (*SlowQueryEvent, error) {
    body := strings.TrimSpace(raw.Value)

    tMatch := timePattern.FindStringSubmatch(body)
    mMatch := metricsPattern.FindStringSubmatch(body)
    uMatch := userHostPattern.FindStringSubmatch(body)
    if len(tMatch) < 2 || len(mMatch) < 5 || len(uMatch) < 3 {
        return nil, ErrInvalidSlowLog
    }

    eventTime, err := time.Parse(time.RFC3339Nano, strings.TrimSpace(tMatch[1]))
    if err != nil {
        return nil, err
    }

    queryTimeSec, _ := strconv.ParseFloat(mMatch[1], 64)
    lockTimeSec, _ := strconv.ParseFloat(mMatch[2], 64)
    rowsSent, _ := strconv.ParseInt(mMatch[3], 10, 64)
    rowsExamined, _ := strconv.ParseInt(mMatch[4], 10, 64)

    db := ""
    if dbMatch := useDBPattern.FindStringSubmatch(body); len(dbMatch) == 2 {
        db = dbMatch[1]
    }

    sqlText := extractSQL(body)
    sqlTemplate := NormalizeSQL(sqlText)
    traceID := ""
    if traceMatch := tracePattern.FindStringSubmatch(sqlText); len(traceMatch) == 2 {
        traceID = traceMatch[1]
    }

    return &SlowQueryEvent{
        EventTime:       eventTime,
        Cluster:         raw.Cluster,
        Namespace:       raw.Namespace,
        Pod:             raw.Pod,
        Instance:        raw.Source,
        DB:              db,
        User:            strings.TrimSpace(uMatch[1]),
        ClientIP:        strings.TrimSpace(uMatch[2]),
        QueryTimeMS:     int64(queryTimeSec * 1000),
        LockTimeMS:      int64(lockTimeSec * 1000),
        RowsSent:        rowsSent,
        RowsExamined:    rowsExamined,
        SQLText:         sqlText,
        SQLTemplate:     sqlTemplate,
        SQLDigest:       DigestSQL(sqlTemplate),
        TraceID:         traceID,
        Service:         raw.Labels["app.kubernetes.io/name"],
        TenantID:        raw.Labels["tenant"],
        Tags:            raw.Labels,
        IngestTimestamp: time.Now().UTC(),
    }, nil
}

func extractSQL(body string) string {
    lines := strings.Split(body, "\n")
    sqlLines := make([]string, 0, len(lines))
    for _, line := range lines {
        trimmed := strings.TrimSpace(line)
        if strings.HasPrefix(trimmed, "# ") {
            continue
        }
        if setTimestampPattern.MatchString(trimmed) {
            continue
        }
        if strings.HasPrefix(strings.ToLower(trimmed), "use ") {
            continue
        }
        if trimmed == "" {
            continue
        }
        sqlLines = append(sqlLines, line)
    }
    return strings.TrimSpace(strings.Join(sqlLines, "\n"))
}

func NormalizeSQL(sql string) string {
    normalized := commentPattern.ReplaceAllString(sql, "")
    normalized = stringPattern.ReplaceAllString(normalized, "?")
    normalized = numberPattern.ReplaceAllString(normalized, "?")
    normalized = blankPattern.ReplaceAllString(normalized, " ")
    return strings.TrimSpace(strings.ToLower(normalized))
}

11.3 指纹生成与消费主流程

package slowlog

import (
    "context"
    "crypto/sha1"
    "encoding/hex"
    "encoding/json"
    "log/slog"
    "sync"
    "time"

    "github.com/segmentio/kafka-go"
)

func DigestSQL(template string) string {
    sum := sha1.Sum([]byte(template))
    return hex.EncodeToString(sum[:])
}

type Writer interface {
    BulkWrite(ctx context.Context, events []*SlowQueryEvent) error
}

type Consumer struct {
    reader     *kafka.Reader
    writer     Writer
    logger     *slog.Logger
    workerNum  int
    batchSize  int
    flushEvery time.Duration
}

func (c *Consumer) Run(ctx context.Context) error {
    parsedCh := make(chan *SlowQueryEvent, c.batchSize*4)
    wg := sync.WaitGroup{}

    for i := 0; i < c.workerNum; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                msg, err := c.reader.FetchMessage(ctx)
                if err != nil {
                    return
                }

                var raw RawMessage
                if err := json.Unmarshal(msg.Value, &raw); err != nil {
                    c.logger.Error("unmarshal raw message failed", "err", err)
                    _ = c.reader.CommitMessages(ctx, msg)
                    continue
                }

                event, err := ParseSlowLog(raw)
                if err != nil {
                    c.logger.Warn("parse slow log failed", "err", err, "source", raw.Source)
                    _ = c.reader.CommitMessages(ctx, msg)
                    continue
                }

                parsedCh <- event
                if err := c.reader.CommitMessages(ctx, msg); err != nil {
                    c.logger.Error("commit kafka message failed", "err", err)
                }
            }
        }()
    }

    flushTicker := time.NewTicker(c.flushEvery)
    defer flushTicker.Stop()

    batch := make([]*SlowQueryEvent, 0, c.batchSize)
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case item := <-parsedCh:
            if item != nil {
                batch = append(batch, item)
            }
            if len(batch) >= c.batchSize {
                if err := c.writer.BulkWrite(ctx, batch); err != nil {
                    c.logger.Error("bulk write failed", "err", err, "size", len(batch))
                }
                batch = batch[:0]
            }
        case <-flushTicker.C:
            if len(batch) == 0 {
                continue
            }
            if err := c.writer.BulkWrite(ctx, batch); err != nil {
                c.logger.Error("periodic bulk write failed", "err", err, "size", len(batch))
            }
            batch = batch[:0]
        }
    }
}

这段代码重点不在语法本身,而在体现以下生产思想:

  • 消费端并发通过 worker pool 提升吞吐
  • 批量刷写减少下游写入放大
  • 解析失败消息要隔离,不能阻塞主消费链路
  • 写入、提交、解析都要有日志和指标

如果你要进一步做强,可以继续补:

  • 死信队列
  • 幂等写入
  • 指标采集
  • Trace 透传
  • 配置热更新

十二、落地实现四:Elasticsearch 明细索引设计

对于明细检索,ES 依旧是很强的选择,但前提是 mapping 要合理。

{
  "index_patterns": ["mysql-slowlog-detail-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "refresh_interval": "15s",
      "codec": "best_compression",
      "index.lifecycle.name": "mysql-slowlog-detail-ilm"
    },
    "mappings": {
      "dynamic": "false",
      "properties": {
        "event_time": { "type": "date" },
        "cluster": { "type": "keyword" },
        "namespace": { "type": "keyword" },
        "pod": { "type": "keyword" },
        "instance": { "type": "keyword" },
        "db": { "type": "keyword" },
        "user": { "type": "keyword" },
        "client_ip": { "type": "ip" },
        "query_time_ms": { "type": "long" },
        "lock_time_ms": { "type": "long" },
        "rows_sent": { "type": "long" },
        "rows_examined": { "type": "long" },
        "sql_digest": { "type": "keyword" },
        "sql_template": {
          "type": "text",
          "fields": {
            "raw": { "type": "keyword", "ignore_above": 2048 }
          }
        },
        "sql_text": {
          "type": "text",
          "index": false
        },
        "trace_id": { "type": "keyword" },
        "service": { "type": "keyword" },
        "tenant_id": { "type": "keyword" }
      }
    }
  }
}

几个生产建议非常重要:

  • sql_text 通常只存不索引,否则索引成本很高
  • 检索与聚合主要依赖 sql_digestservicedbtenant_id
  • dynamic=false 避免标签乱写把 mapping 打爆
  • 用 ILM 做冷热分层,避免明细长期堆在热节点

Elasticsearch 的索引模板设计直接影响平台长期的可维护性,务必在一开始就规划好分片策略和生命周期管理。

十三、落地实现五:ClickHouse 聚合表设计

如果你的平台要做 Top 慢 SQL 榜单、按分钟趋势、租户维度排行、过去 7 天恶化分析,那么聚合层用列式存储更划算。

CREATE TABLE mysql_slowlog_agg_minute
(
    bucket_time       DateTime,
    cluster           String,
    service           String,
    tenant_id         String,
    instance          String,
    db                String,
    sql_digest        String,
    sample_sql        String,
    total_count       UInt64,
    avg_query_time_ms UInt64,
    p95_query_time_ms UInt64,
    p99_query_time_ms UInt64,
    max_query_time_ms UInt64,
    avg_rows_examined UInt64,
    total_lock_time_ms UInt64
)
ENGINE = ReplacingMergeTree
PARTITION BY toDate(bucket_time)
ORDER BY (cluster, service, db, sql_digest, bucket_time)
TTL bucket_time + INTERVAL 90 DAY;

这个表不存所有明细,只存分钟级聚合结果。优势很明显:

  • 榜单查询快
  • 聚合成本低
  • 存储成本远低于全量明细

常见用法:

  • 最近 10 分钟 Top 20 最慢 SQL
  • 某业务过去 24 小时 P99 恶化排行
  • 某租户慢查询占总请求比例变化趋势

十四、告警设计:别只做阈值告警,要做“恶化告警”

很多平台上线后,告警一开始很多,后来大家都关了。原因不是平台没数据,而是规则太粗暴。

14.1 阈值告警只能解决最初级问题

例如:

  • 平均耗时超过 2 秒
  • 5 分钟内出现次数超过 50

这种规则能发现明显异常,但也很容易造成噪音。

14.2 更有效的是恶化型规则

例如:

  • 同一 sql_digest 最近 5 分钟平均耗时较前 30 分钟上涨 300%
  • 某业务慢查询占比从 0.2% 升到 5%
  • 某租户的 rows_examined/query_count 突然异常放大
  • lock_time_ms/query_time_ms 占比持续高于阈值,说明锁问题恶化

14.3 示例规则

rule_name: sql_digest_regression
window: 5m
baseline_window: 30m
group_by:
  - cluster
  - service
  - db
  - sql_digest
conditions:
  - metric: count
    op: ">="
    value: 20
  - metric: avg_query_time_ms
    compare_to: baseline.avg_query_time_ms
    op: ">="
    value: 3.0
  - metric: avg_rows_examined
    compare_to: baseline.avg_rows_examined
    op: ">="
    value: 2.0
actions:
  - type: dingtalk
  - type: jira
  - type: webhook

这类告警更接近生产价值,因为它能识别“同一类 SQL 在恶化”,而不是只看到绝对值。

十五、与可观测体系打通:慢日志不能孤岛化

真正的生产事故排查,不会只盯着慢日志看。你一定会同时看:

  • 应用 P95/P99
  • 数据库连接数
  • InnoDB 行锁等待
  • CPU、IO、磁盘延迟
  • 调用链 Trace

所以平台最好具备下面几种联动能力。

15.1 与 APM/Trace 联动

应用在 SQL 前注入注释:

/* trace_id:7fdac9a1 service:order-api tenant:t-001 */
SELECT * FROM orders WHERE id = 123456;

这样解析器提取 trace_id 后,就可以从慢 SQL 直接跳到完整链路。

15.2 与 CMDB/服务注册联动

平台拿到 instance 后,不应只显示 IP,而应自动富化出:

  • 所属业务线
  • 所属团队
  • 所属环境
  • 所属租户
  • 主从角色

这样告警才能直接发给正确团队。

15.3 与治理动作联动

当某类 SQL 风暴爆发时,平台可以触发:

  • 通知 DBA 和业务值班
  • 创建工单
  • 调用限流平台降级
  • 修改熔断阈值
  • 在极端情况下执行 kill 策略

注意,这里的自动 Kill 一定要非常谨慎,通常只建议:

  • 先告警再人工确认
  • 或仅对特定白名单规则执行自动处理

十六、真实案例:平台如何在 5 分钟内完成发现、定位、止血

下面给一个更完整的案例,帮助你理解平台上线后的实际价值。

16.1 背景

支付域某次版本发布后,订单查询接口延迟明显升高。该接口流量大,涉及订单表与优惠券表联查。

16.2 事故发展时间线

02:03

  • 新版本开始灰度

02:08

  • 某条查询因为谓词改写,原本命中的复合索引失效

02:10

  • 平台聚合层发现某个 sql_digest 的平均耗时从 120ms 上升到 2.4s
  • 同时 rows_examined 从 800 上升到 180,000

02:11

  • 告警触发并携带以下信息:
    • 业务:payment-query
    • 集群:prod-cn-shanghai
    • 实例:mysql-pay-03
    • Digest:7d5f...
    • 5 分钟平均耗时:2430ms
    • 环比上涨:18.7x
    • 示例 SQL:脱敏模板

02:12

  • 值班同学从告警链接直达仪表盘,发现异常只出现在灰度版本实例
  • 通过 trace_id 跳转 APM,确认慢 SQL 来自新接口代码路径

02:14

  • 业务团队执行回滚
  • DBA 临时补建索引并验证执行计划

02:18

  • 平台观察到该 digest 的 5 分钟窗口数据恢复正常
  • 告警自动关闭

16.3 平台在其中起到的作用

如果没有平台,这次故障大概率会演变成:

  • 先看到应用超时
  • 再去查连接数
  • 再登库查进程
  • 再翻日志定位 SQL
  • 最后人工推断是哪次发布导致

而有了平台后,路径被缩短为:

告警直达异常 digest -> 下钻到具体实例 -> 联动 Trace -> 回滚和修复

这就是工程平台的价值:它不是替你优化 SQL,而是把“发现问题和组织协同”的时间从 30 分钟压缩到 5 分钟以内。

十七、常见坑位:平台为什么上线后容易“看起来有,实际上不好用”

这部分非常重要,因为大多数平台不是设计死的,而是死在这些工程细节上。

17.1 多行合并错误,导致一条 SQL 被拆成多条

后果:

  • SQL 文本不完整
  • 指纹不稳定
  • 统计结果失真

解决:

  • 明确以 # Time: 作为日志边界
  • 压测超长 SQL 和批量语句场景
  • 为 multiline 设置 max_linestimeout

17.2 日志轮转与 inode 切换导致丢日志

后果:

  • 慢日志平台出现断点
  • 故障时最关键窗口数据缺失

解决:

  • 优先使用容器标准日志机制
  • 或确保采集器支持轮转感知
  • 明确 copytruncate 与 rename 策略影响

17.3 Kafka 分区太少,消费扩不起来

后果:

  • 消费延迟积压
  • 明明扩了 Pod,吞吐没提升

解决:

  • 分区数按未来峰值预留
  • 消费组扩缩容与分区规划联动

17.4 ES 直接被写爆

后果:

  • 查询慢
  • 写入拒绝
  • 集群 merge 压力过高

解决:

  • 提高 refresh_interval
  • 批量写入
  • 明细与聚合分层
  • 用 ILM 做冷热分离

17.5 没有业务标签,最后只能看到“哪个 IP 慢”

后果:

  • 告警找不到责任方
  • 统计无法按业务和租户拆分

解决:

  • 在采集阶段或解析阶段统一打通 K8s、CMDB、服务注册信息

17.6 只做静态阈值,告警疲劳

后果:

  • 告警一堆,没人看

解决:

  • 引入环比、同比、占比、突增、异常分布规则

十八、从平台到中台:下一步还能怎么演进

如果这套系统已经稳定跑起来,下一阶段演进方向通常有六个。

18.1 SQL 优化建议自动化

基于:

  • 扫描行数
  • 排序/临时表特征
  • 历史执行趋势

自动生成优化建议,例如:

  • 建议补复合索引
  • 建议改造分页方式
  • 建议拆分大事务

18.2 引入 EXPLAIN 样本回采

对高价值慢 SQL 做异步样本回采,补充:

  • 执行计划
  • 命中索引
  • 估算行数

注意必须脱离主执行链路,不能在慢日志处理链路中同步回查数据库。

18.3 加入异常检测模型

对 digest 序列做异常检测,识别:

  • 工作日/周末行为差异
  • 周期性峰值
  • 非线性突增

18.4 建立慢 SQL 画像中心

每个 sql_digest 形成长期画像:

  • 典型耗时区间
  • 高发时间段
  • 涉及业务
  • 影响租户
  • 历史事故记录

18.5 融合 Performance Schema

把 digest 聚合、等待事件与慢日志明细打通,形成更完整的诊断面。

18.6 多数据中心联邦查询

每个区域本地采、本地算、中心化查询,避免跨地域全量传输。

十九、实施路线图:别一次性做太重,分三阶段推进

很多团队看完架构图就想一步到位,最后反而因为工程量太大推进失败。更现实的落地方式是分阶段建设。

第一阶段:先解决“看得见”

目标:

  • 收齐慢日志
  • 建立基础检索和大盘
  • 能按实例、库、SQL 模板查询

建议链路:

Filebeat -> Kafka -> Go Parser -> Elasticsearch -> Kibana

第二阶段:再解决“看得懂”

目标:

  • 完成 SQL 指纹聚合
  • 做分钟级统计
  • 支持按业务、租户、服务维度分析

新增能力:

  • 聚合表
  • 告警规则
  • CMDB/Trace 富化

第三阶段:最后解决“能治理”

目标:

  • 告警自动分发
  • 故障联动
  • 部分自动化止血

新增能力:

  • 工单和通知系统联动
  • 降级/限流/自动 kill 策略
  • 历史画像和治理闭环

这三步走的好处是:每阶段都能独立交付价值,不会因为最终形态太大而中途夭折。

二十、总结:慢日志平台的真正价值,不在日志,而在治理

回到文章开头的问题。

为什么很多团队明明开了慢查询日志,事故来了还是手足无措?

因为他们拥有的是“慢日志文件”,而不是“慢查询治理平台”。

一套真正专业、可持续演进的方案,至少要具备这些能力:

  • FILE 慢日志为基础入口,低侵入采集
  • 通过 Kafka 建立削峰填谷和重放能力
  • 通过解析层完成 SQL 指纹、标签富化和分钟聚合
  • 通过 ES/ClickHouse 分离明细检索与统计分析
  • 通过告警、Trace、CMDB、治理动作构建闭环

更重要的是,它要经得住高并发、故障洪峰和组织协同,而不是只在测试环境里“能跑”。

如果你准备把这件事真正落地,建议从一个核心业务库开始,先跑通最小闭环:

  1. 打开稳定可控的慢日志配置
  2. 用 Sidecar 或 DaemonSet 收集日志
  3. 做 SQL 指纹解析和分钟聚合
  4. 建立第一个 TopN 大盘和第一条恶化告警
  5. 打通业务标签和 Trace 链路

当你完成这一步之后,慢查询对团队来说就不再是“深夜靠经验排障”的黑盒,而会变成一类可以实时观测、快速定位、持续治理的工程问题。

而这,才是云原生时代数据库可观测平台该有的样子。




上一篇:Kubernetes生产落地全景实践:从崩溃到自愈的系统工程治理
下一篇:Burp Suite 2026.4 更新与破解教程:注册机+汉化一步到位
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-5-10 04:26 , Processed in 0.652064 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表