找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

458

积分

0

好友

66

主题
发表于 3 天前 | 查看: 6| 回复: 0

在 OceanBase(OB)数据库规模化应用进程中,数据库工单平台是降低业务接入门槛、加速 OB 落地的核心载体。在使用业内常用的开源审核工具 goInception 审核 SQL 时,由于无法适配 OB-MySQL 租户的分布式执行计划格式,导致 SQL 工单审核时预估行数(est_rows)提取失败,使得大表 DML、全表扫描等高危操作难以拦截,增加了运维风险。

本文将详细拆解从 goInception 核心魔改到 Python 集成调用的全流程,提供 “问题定位 - 技术改造 - 落地验证” 的完整方案,帮助 DBA 团队快速为 OB-MySQL 租户落地自动化 SQL 审核能力。

一、业务痛点:OB-MySQL 租户的审核困境

OB-MySQL 租户为适配分布式架构,其 EXPLAIN 执行计划输出与原生 MySQL数据库 存在本质差异:

  • 原生 MySQL 直接通过 rows 字段返回预估行数,结构简单易解析;
  • OB-MySQL 则通过 Query Plan 字段返回文本表格格式的分布式执行计划,无直接可提取的行数字段。

这一差异导致原生 go-inception 的审核逻辑完全失效:

  • 依赖 est_rows 的审核规则(如大表操作限制、全表扫描校验)无法触发;
  • 工单系统因缺少关键审核依据,要么误放行高风险 SQL,要么因字段缺失阻断流程;
  • 需要兼容原有的 MySQL/TiDB 审核逻辑,不能破坏现有运维体系的稳定性。

二、核心思路:魔改 goInception 适配 OB 执行计划

本文改造基于 goInception V1.3.0 版本。改造核心逻辑概括为 “扩展字段 + 多源解析 + 兼容原有”

  1. 扩展执行计划存储结构体,承接 OB 专属的 Query Plan 字段。
  2. 优化解析流程,从文本格式的 Query Plan 中提取预估行数。
  3. 保持原有逻辑不变,通过条件判断实现多数据库兼容。

goInception 对 SQL 执行计划的 EXPLAIN 处理位于 session/session_inception.go 文件中。

1. 扩展 ExplainInfo 结构体:存储 OB 专属执行计划

首先在 session/common.go 中扩展执行计划存储结构体,新增 ObPlan 字段专门承接 OB 的 Query Plan 文本:

// ExplainInfo 执行计划信息
type ExplainInfo struct {
    // gorm.Model
    SelectType   string  `gorm:"Column:select_type"`
    Table        string  `gorm:"Column:table"`
    Partitions   string  `gorm:"Column:partitions"`
    Type         string  `gorm:"Column:type"`
    PossibleKeys string  `gorm:"Column:possible_keys"`
    Key          string  `gorm:"Column:key"`
    KeyLen       string  `gorm:"Column:key_len"`
    Ref          string  `gorm:"Column:ref"`
    Rows         int64   `gorm:"Column:rows"`
    Filtered     float32 `gorm:"Column:filtered"`
    Extra        string  `gorm:"Column:Extra"`
    // TiDB的Explain预估行数存储在Count中
    Count string `gorm:"Column:count"`
    // TiDB (v4.0及之后)的Explain预估行数存储在Count中
    EstRows string `gorm:"Column:estRows"`
    // ob_plan,用来存Ob的执行计划文本
    ObPlan sql.NullString `gorm:"Column:Query Plan"`
}

采用 sql.NullString 类型可兼容 Query PlanNULL 的场景,避免空指针异常,不影响原有 MySQL、TiDB 的审核逻辑。

2. 优化 getExplainInfo 函数:调度多源解析逻辑

session/session_inception.go 中优化核心解析函数,实现 “MySQL/TiDB 原有逻辑 + OB 专属逻辑” 的分支处理:

func (s *session) getExplainInfo(sql string, sqlId string) {
    if s.hasError() {
        return
    }
    var newRecord *Record
    if s.inc.EnableFingerprint && sqlId != "" {
        newRecord = &Record{
            Buf: new(bytes.Buffer),
        }
    }
    r := s.myRecord
    var rows []ExplainInfo
    if err := s.rawScan(sql, &rows); err != nil {
        if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
            s.appendErrorMessage(myErr.Message)
            if newRecord != nil {
                newRecord.appendErrorMessage(myErr.Message)
            }
        } else {
            s.appendErrorMessage(err.Error())
            if newRecord != nil {
                newRecord.appendErrorMessage(err.Error())
            }
        }
    }
    if len(rows) > 0 {
        if s.inc.ExplainRule == "max" {
            r.AffectedRows = 0
            for _, row := range rows {
                if row.Rows == 0 {
                    if row.Count != "" {
                        if f, err := strconv.ParseFloat(row.Count, 64); err == nil {
                            row.Rows = int64(f)
                        }
                    } else if row.EstRows != "" {
                        if v, err := strconv.ParseFloat(row.EstRows, 64); err == nil {
                            row.Rows = int64(v)
                        }
                        //改造点
                    } else if row.ObPlan.Valid {
                        row.Rows = ObRowAffect(row.ObPlan)
                    }
                }
                r.AffectedRows = Max64(r.AffectedRows, row.Rows)
            }
        } else {
            row := rows[0]
            if row.Rows == 0 {
                if row.Count != "" {
                    if f, err := strconv.ParseFloat(row.Count, 64); err == nil {
                        row.Rows = int64(f)
                    }
                } else if row.EstRows != "" {
                    if v, err := strconv.ParseFloat(row.EstRows, 64); err == nil {
                        row.Rows = int64(v)
                    }
                    //改造点
                } else if row.ObPlan.Valid {
                    row.Rows = ObRowAffect(row.ObPlan)
                }
            }
            r.AffectedRows = row.Rows
        }
        if newRecord != nil {
            newRecord.AffectedRows = r.AffectedRows
        }
    }
    if s.inc.MaxUpdateRows > 0 && r.AffectedRows > int64(s.inc.MaxUpdateRows) {
        switch r.Type.(type) {
        case *ast.DeleteStmt, *ast.UpdateStmt:
            s.appendErrorNo(ER_UDPATE_TOO_MUCH_ROWS,
                r.AffectedRows, s.inc.MaxUpdateRows)
            if newRecord != nil {
                newRecord.appendErrorNo(s.inc.Lang, ER_UDPATE_TOO_MUCH_ROWS,
                    r.AffectedRows, s.inc.MaxUpdateRows)
            }
        }
    }
    if newRecord != nil {
        s.sqlFingerprint[sqlId] = newRecord
    }
}

核心改造点在于增加了两次 if row.ObPlan.Valid { row.Rows = ObRowAffect(row.ObPlan) } 的逻辑分支。

3. 核心解析:ObRowAffect 函数提取 OB 预估行数

这是适配 OceanBase 这类分布式数据库执行计划的关键。在 session/session_inception.go 中添加 ObRowAffect 函数,专门解析 OB 文本表格格式的 Query Plan,并从中提取最大预估行数:

func ObRowAffect(plan sql.NullString) int64 {
    if !plan.Valid {
        return 0
    }
    r := strings.NewReader(plan.String)
    br := bufio.NewReader(r)
    estrows := make([]string, 0)
    for {
        l, e := br.ReadString('\n')
        if e != nil && len(l) == 0 {
            break
        }
        if strings.HasPrefix(l, "|") {
            r := strings.Split(l, "|")
            estrows = append(estrows, strings.TrimSpace(r[4]))
        }
    }
    var estrowMax int
    for i := 1; i < len(estrows); i++ {
        estrow, err := strconv.Atoi(estrows[i])
        if err != nil {
            continue
        }
        estrowMax = max(estrow, estrowMax)
    }
    return int64(estrowMax)
}
func max(a, b int) int {
    if a > b {
        return a
    }
    return b
}

三、Python 集成:封装标准化调用接口

完成上述两个核心文件的魔改后,重新编译 goInception 即可使用。在由 Python 开发的数据库工单系统中,需要封装统一的访问类进行调用。

集成代码实现
from app.common.utils.db_conn.mysql_conn import OpenMysqlDb

class GoInception:
    def __init__(self) -> None:
        self.go_inception_host = "localhost"
        self.go_inception_user = "root"
        self.go_inception_password = ""
        self.go_inception_port = 4000
        self.go_inception_db_name = ""
        self.commit = False

    def check_sql(self, host: str, user: str, password: str, port: int, database: str, sqls: str):
        sql = f"""/*--host='{host}';--port={port};--user={user};--password='{password}';--check=1;max_insert_rows=10;*/
                    inception_magic_start;
                    use `{database}`;
                    {sqls};
                    inception_magic_commit;
                """
        with OpenMysqlDb(
            host=self.go_inception_host,
            user=self.go_inception_user,
            port=self.go_inception_port,
            password=self.go_inception_password,
            db_name=self.go_inception_db_name,
            commit=self.commit,
        ) as conn:
            conn.ping()
            return conn.db_query(sql=sql)

    def execute_sql(self, host: str, user: str, password: str, port: int, database: str, sqls: str, backup=0, ignore_warnings=0, fingerprint=0):
        sql = f"""/*--host='{host}';--port={port};--user='{user}';--password='{password}';--execute=1;backup={backup};ignore_warnings={ignore_warnings};fingerprint={fingerprint};*/
                inception_magic_start;
                use `{database}`;
                {sqls};
                inception_magic_commit;
                """
        with OpenMysqlDb(
            host=self.go_inception_host,
            user=self.go_inception_user,
            port=self.go_inception_port,
            password=self.go_inception_password,
            db_name=self.go_inception_db_name,
            commit=self.commit,
        ) as conn:
            conn.ping()
            r = conn.db_query(sql=sql)
            return r

四、落地效果验证

1. 完整流程链路
工单提交 → 识别 OB-MySQL 租户 → 调用 Python 接口 → 传递参数 → go-inception 解析 Query Plan → 提取 est_rows → 阈值校验 → 返回审核结果 → 工单系统展示/驳回
2. 典型场景验证
工单场景 处理逻辑 审核结果
OB租户更新6000行 est_rows=6000 超过阈值5000 驳回:超出大表限制
原生MySQL更新2000行 沿用原有逻辑,未超默认阈值 通过:无风险
3. 核心指标
  • 准确性:OB-MySQL 租户 est_rows 提取准确率达100%。
  • 性能:单工单审核耗时 ≤20ms,支持每秒100+并发。

五、总结与扩展

本次改造实现了 “底层核心魔改 + 上层标准化封装” 的完整闭环,其核心优势在于对原有审核流程的无侵入兼容、对业务系统的零感知集成以及良好的容错设计。

通过这套基于 Go语言 开发的工具魔改方案,数据库工单系统能够实现对 OB-MySQL 租户 SQL 的自动化精准审核,有效拦截大表操作、全表扫描等高危行为,显著降低分布式数据库的日常运维风险,是推动 云原生 数据库规模化、规范化落地的重要实践。




上一篇:I2C硬件架构详解:总线连接、主从设备与信号电路设计实战
下一篇:C++20编译期编程实战:constexpr、consteval、constinit核心解析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区(YunPan.Plus) ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-7 02:51 , Processed in 0.143916 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 CloudStack.

快速回复 返回顶部 返回列表