找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1135

积分

1

好友

152

主题
发表于 前天 03:17 | 查看: 4| 回复: 0

前置知识:Go语言、分布式系统、etcd、Milvus架构基础

📋 概述

RootCoord 的定位

RootCoord(Root Coordinator)堪称Milvus分布式集群的“大脑”,它承担着全局元数据管理与DDL(数据定义语言)操作协调的核心职责。具体功能包括:

  1. 元数据管理:负责Database、Collection、Partition、Field等所有元数据的生命周期。
  2. DDL协调:统一调度CREATE、DROP、ALTER等数据定义操作。
  3. ID分配:为各类实体(如Collection、Partition)分配全局唯一ID。
  4. 时间戳服务:提供全局递增的TSO(Timestamp Oracle),保证操作顺序。
  5. 权限管理:管理基于角色的访问控制(RBAC)体系。

核心功能列表

RootCoord 核心功能
├── 元数据管理
│   ├── Database CRUD
│   ├── Collection CRUD
│   ├── Partition CRUD
│   └── Alias 管理
├── DDL 任务调度
│   ├── 任务队列管理
│   ├── 任务执行引擎
│   └── 分布式锁机制
├── ID 分配服务
│   ├── Collection ID
│   ├── Partition ID
│   └── Segment ID
├── TSO 时间戳服务
│   └── 全局时间戳分配
└── RBAC 权限管理
    ├── 用户管理
    ├── 角色管理
    └── 权限控制

与其他组件的关系

┌─────────────────────────────────────────────┐
│                   Proxy                      │
│          (接收客户端 DDL 请求)               │
└──────────────────┬──────────────────────────┘
                   │ gRPC
                   ▼
┌─────────────────────────────────────────────┐
│               RootCoord                      │
│  ┌─────────────────────────────────────┐   │
│  │         Scheduler (任务调度)         │   │
│  └─────────────────────────────────────┘   │
│  ┌─────────────────────────────────────┐   │
│  │       MetaTable (元数据管理)         │   │
│  └─────────────────────────────────────┘   │
│  ┌─────────────────────────────────────┐   │
│  │      ID Allocator (ID 分配)         │   │
│  └─────────────────────────────────────┘   │
│  ┌─────────────────────────────────────┐   │
│  │      TSO Allocator (时间戳)         │   │
│  └─────────────────────────────────────┘   │
└──────────────┬──────────────┬───────────────┘
               │              │
               │ etcd         │ 通知其他组件
               ▼              ▼
        ┌─────────┐    ┌──────────────┐
        │  etcd   │    │ DataCoord    │
        │ (元数据)│    │ QueryCoord   │
        └─────────┘    └──────────────┘

🗂️ 源码结构

目录结构

internal/rootcoord/
├── root_coord.go              # 主服务实现,Core 结构体
├── scheduler.go               # 任务调度器
├── meta_table.go              # 元数据表管理
├── task.go                    # 任务接口定义
│
# DDL Callbacks(DDL 操作实现)
├── ddl_callbacks.go                           # Callback 基础
├── ddl_callbacks_create_collection.go         # 创建 Collection
├── ddl_callbacks_drop_collection.go           # 删除 Collection
├── ddl_callbacks_alter_collection*.go         # 修改 Collection
├── ddl_callbacks_create_partition.go          # 创建 Partition
├── ddl_callbacks_drop_partition.go            # 删除 Partition
├── ddl_callbacks_create_database.go           # 创建 Database
├── ddl_callbacks_drop_database.go             # 删除 Database
├── ddl_callbacks_alter_database.go            # 修改 Database
├── ddl_callbacks_*_alias.go                   # Alias 操作
├── ddl_callbacks_rbac*.go                     # RBAC 操作
│
# 任务实现
├── create_collection_task.go  # 创建 Collection 任务
├── drop_collection_task.go    # 删除 Collection 任务
├── alter_collection_task.go   # 修改 Collection 任务
├── has_collection_task.go     # 检查 Collection 任务
├── describe_collection_task.go # 描述 Collection 任务
├── show_collection_task.go    # 展示 Collection 任务
├── *_partition_task.go        # Partition 相关任务
├── *_db_task.go               # Database 相关任务
├── rbac_task.go               # RBAC 相关任务
│
# 辅助模块
├── quota_center.go           # 配额管理
├── expire_cache.go           # 缓存过期
├── dml_channels.go           # DML Channel 管理
├── timeticksync.go           # 时间同步
├── ddl_ts_lock_manager.go    # DDL 时间戳锁
└── util.go                   # 工具函数

核心数据结构

首先分析RootCoord的核心结构体:

// internal/rootcoord/root_coord.go
// Core root coordinator core
type Core struct {
    ctx              context.Context
    cancel           context.CancelFunc
    wg               sync.WaitGroup

    // 客户端连接
    etcdCli          *clientv3.Client      // etcd 客户端
    tikvCli          *txnkv.Client         // TiKV 客户端(可选)

    // 核心组件
    meta             IMetaTable            // 元数据表
    scheduler        IScheduler            // 任务调度器

    // ID 分配器
    idAllocator      allocator.Interface   // 全局 ID 分配器
    tsoAllocator     tso.Allocator         // TSO 时间戳分配器

    // Broker(与其他组件通信)
    broker           Broker                // 消息代理

    // 会话管理
    session          *sessionutil.Session  // 服务会话

    // 其他组件
    quotaCenter      *QuotaCenter          // 配额中心
    expireCache      *ExpireCache          // 缓存过期管理
    channelManager   *dmlChannelManager    // DML Channel 管理

    // 状态
    stateCode        atomic.Value          // 服务状态
    initOnce         sync.Once             // 初始化标志
}

关键字段说明

  1. meta (IMetaTable):元数据管理的核心接口,负责所有元数据的CRUD操作。
  2. scheduler (IScheduler):任务调度器,管理DDL任务的执行顺序和并发控制。
  3. idAllocator:全局ID分配器,为Collection、Partition等分配唯一ID。
  4. tsoAllocator:TSO时间戳分配器,提供全局递增的时间戳。
  5. broker:与DataCoord、QueryCoord等其他组件通信的代理。

🚀 核心流程解析

1. RootCoord 启动流程

深入分析RootCoord服务的启动过程:

// internal/rootcoord/root_coord.go
func (c *Core) Init() error {
    // 1. 初始化 etcd 客户端
    c.initEtcd()

    // 2. 初始化会话(注册到 etcd)
    c.initSession()

    // 3. 初始化 ID 分配器
    c.initIDAllocator()

    // 4. 初始化 TSO 分配器
    c.initTSOAllocator()

    // 5. 初始化元数据表
    c.initMeta()

    // 6. 初始化任务调度器
    c.initScheduler()

    // 7. 初始化其他组件
    c.initBroker()
    c.initQuotaCenter()
    c.initExpireCache()

    return nil
}

func (c *Core) Start() error {
    // 1. 启动 ID 分配器
    c.idAllocator.Start()

    // 2. 启动 TSO 分配器
    c.tsoAllocator.Start()

    // 3. 启动任务调度器
    c.scheduler.Start()

    // 4. 启动配额中心
    c.quotaCenter.Start()

    // 5. 启动缓存过期管理
    c.expireCache.Start()

    // 6. 更新服务状态
    c.UpdateStateCode(commonpb.StateCode_Healthy)

    return nil
}

启动流程图

┌─────────────────────────────────────────────┐
│              RootCoord.Init()               │
└──────────────────┬──────────────────────────┘
                   │
    ┌──────────────┼──────────────┐
    │              │              │
    ▼              ▼              ▼
┌────────┐   ┌─────────┐   ┌──────────┐
│ etcd   │   │ Session │   │ ID/TSO   │
│ Client │   │ Register│   │Allocator │
└────────┘   └─────────┘   └──────────┘
                   │
                   ▼
         ┌─────────────────┐
         │   MetaTable     │
         │  (加载元数据)    │
         └─────────────────┘
                   │
                   ▼
         ┌─────────────────┐
         │   Scheduler     │
         │  (启动调度器)    │
         └─────────────────┘
                   │
                   ▼
         ┌─────────────────┐
         │  RootCoord      │
         │   Ready!        │
         └─────────────────┘

2. 元数据管理核心实现

MetaTable是RootCoord的核心组件,负责所有元数据的内存缓存与持久化。

// internal/rootcoord/meta_table.go
type MetaTable struct {
    ctx    context.Context

    // 元数据存储
    catalog metastore.RootCoordCatalog  // 元数据持久化接口

    // 内存缓存
    dbName2ID        map[string]int64                   // DB 名称 -> ID
    dbID2Meta        map[int64]*model.Database          // DB ID -> 元数据
    collName2ID      map[string]map[string]int64        // DB名称 -> Collection名称 -> ID
    collID2Meta      map[int64]*model.Collection        // Collection ID -> 元数据
    collAlias2ID     map[string]map[string]int64        // DB名称 -> Alias -> Collection ID
    partID2Meta      map[int64]*model.Partition         // Partition ID -> 元数据

    // 并发控制
    ddLock           sync.RWMutex                       // 读写锁

    // TSO 分配器
    tsoAllocator     tso.Allocator
}

元数据操作示例 - 创建 Collection

// internal/rootcoord/meta_table.go
func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection) error {
    mt.ddLock.Lock()
    defer mt.ddLock.Unlock()

    // 1. 检查 Collection 是否已存在
    if _, ok := mt.collID2Meta[coll.CollectionID]; ok {
        return fmt.Errorf("collection already exists: %d", coll.CollectionID)
    }

    // 2. 检查名称冲突
    dbName := coll.DBName
    if collID, ok := mt.collName2ID[dbName][coll.Name]; ok {
        return fmt.Errorf("collection name already exists: %s", coll.Name)
    }

    // 3. 持久化到 etcd
    if err := mt.catalog.CreateCollection(ctx, coll, coll.CreateTime); err != nil {
        return err
    }

    // 4. 更新内存缓存
    mt.collID2Meta[coll.CollectionID] = coll
    if mt.collName2ID[dbName] == nil {
        mt.collName2ID[dbName] = make(map[string]int64)
    }
    mt.collName2ID[dbName][coll.Name] = coll.CollectionID

    // 5. 更新 Partition 缓存
    for _, partition := range coll.Partitions {
        mt.partID2Meta[partition.PartitionID] = partition
    }

    log.Info("collection added successfully",
        zap.Int64("collectionID", coll.CollectionID),
        zap.String("collectionName", coll.Name))

    return nil
}

元数据查询示例

func (mt *MetaTable) GetCollectionByName(
    ctx context.Context,
    dbName string,
    collectionName string,
    ts Timestamp,
) (*model.Collection, error) {
    mt.ddLock.RLock()
    defer mt.ddLock.RUnlock()

    // 1. 从内存缓存查找 Collection ID
    collID, ok := mt.collName2ID[dbName][collectionName]
    if !ok {
        return nil, merr.WrapErrCollectionNotFound(collectionName)
    }

    // 2. 根据 ID 获取 Collection 元数据
    coll, ok := mt.collID2Meta[collID]
    if !ok {
        return nil, merr.WrapErrCollectionNotFound(collectionName)
    }

    // 3. 检查时间戳(MVCC)
    if ts != 0 && coll.CreateTime > ts {
        return nil, merr.WrapErrCollectionNotFound(collectionName)
    }

    // 4. 克隆返回(避免外部修改)
    return model.CloneCollection(coll), nil
}

关键设计点

  1. 双层索引:通过 collName2IDcollID2Meta 实现名称到ID,再到完整元数据的快速查找。
  2. 读写锁:使用 sync.RWMutex 保证并发安全,支持多读单写。
  3. MVCC:通过时间戳参数实现多版本并发控制,支持历史数据快照查询。
  4. 缓存一致性:采用先持久化到etcd,再更新内存缓存的策略,确保数据不丢失。

3. 任务调度器深度解析

Scheduler是RootCoord的任务调度引擎,负责DDL任务的排队、排序和执行。

// internal/rootcoord/scheduler.go
type scheduler struct {
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup

    // 分配器
    idAllocator  allocator.Interface  // ID 分配器
    tsoAllocator tso.Allocator        // TSO 分配器

    // 任务队列
    taskChan chan task                // 任务通道
    taskHeap typeutil.Heap[task]      // 任务堆(按时间戳排序)

    // 并发控制
    lock sync.Mutex

    // 分布式锁
    clusterLock    *lock.KeyLock[string]    // 集群级锁
    databaseLock   *lock.KeyLock[string]    // Database 级锁
    collectionLock *lock.KeyLock[string]    // Collection 级锁

    // 最小 DDL 时间戳
    minDdlTs atomic.Uint64
}

任务调度流程

// 1. 添加任务
func (s *scheduler) AddTask(t task) error {
    // 分配时间戳
    ts, err := s.tsoAllocator.GenerateTSO(1)
    if err != nil {
        return err
    }
    t.SetTs(ts)

    // 分配任务 ID
    id, err := s.idAllocator.AllocOne()
    if err != nil {
        return err
    }
    t.SetID(id)

    // 发送到任务通道
    select {
    case s.taskChan <- t:
        return nil
    case <-s.ctx.Done():
        return s.ctx.Err()
    }
}

// 2. 任务循环处理
func (s *scheduler) taskLoop() {
    defer s.wg.Done()

    for {
        select {
        case <-s.ctx.Done():
            return
        case task := <-s.taskChan:
            // 将任务加入堆
            s.lock.Lock()
            s.taskHeap.Push(task)
            s.lock.Unlock()

            // 尝试执行任务
            s.tryExecuteTasks()
        }
    }
}

// 3. 执行任务
func (s *scheduler) tryExecuteTasks() {
    s.lock.Lock()
    defer s.lock.Unlock()

    for s.taskHeap.Len() > 0 {
        // 获取时间戳最小的任务
        t := s.taskHeap.Peek()

        // 检查是否可以执行
        if !s.canExecute(t) {
            break
        }

        // 从堆中移除
        s.taskHeap.Pop()

        // 异步执行任务
        go s.executeTask(t)
    }
}

// 4. 执行单个任务
func (s *scheduler) executeTask(t task) {
    // 获取锁
    locks := s.acquireLocks(t)
    defer s.releaseLocks(locks)

    // 准备阶段
    if err := t.Prepare(s.ctx); err != nil {
        log.Error("task prepare failed", zap.Error(err))
        t.NotifyDone(err)
        return
    }

    // 执行阶段
    if err := t.Execute(s.ctx); err != nil {
        log.Error("task execute failed", zap.Error(err))
        t.NotifyDone(err)
        return
    }

    // 通知完成
    t.NotifyDone(nil)
}

任务调度流程图

┌──────────────┐
│ Client 请求  │
└──────┬───────┘
       │
       ▼
┌──────────────────────────────┐
│  Proxy 转发到 RootCoord      │
└──────────────┬───────────────┘
               │
               ▼
┌──────────────────────────────┐
│  创建 Task 对象               │
│  - CreateCollectionTask      │
│  - DropCollectionTask        │
│  - AlterCollectionTask       │
└──────────────┬───────────────┘
               │
               ▼
┌──────────────────────────────┐
│  Scheduler.AddTask()         │
│  1. 分配 TSO 时间戳          │
│  2. 分配任务 ID              │
│  3. 加入任务通道             │
└──────────────┬───────────────┘
               │
               ▼
┌──────────────────────────────┐
│  taskLoop() 接收任务         │
│  1. 从 channel 接收          │
│  2. 加入最小堆(按 TS 排序) │
└──────────────┬───────────────┘
               │
               ▼
┌──────────────────────────────┐
│  tryExecuteTasks()           │
│  1. 检查是否可执行           │
│  2. 获取分布式锁             │
│  3. 异步执行任务             │
└──────────────┬───────────────┘
               │
               ▼
┌──────────────────────────────┐
│  executeTask()               │
│  1. Prepare() - 准备阶段     │
│  2. Execute() - 执行阶段     │
│  3. NotifyDone() - 通知完成  │
└──────────────┬───────────────┘
               │
               ▼
┌──────────────────────────────┐
│  返回结果给 Client           │
└──────────────────────────────┘

分布式锁机制

// 获取任务所需的锁
func (s *scheduler) acquireLocks(t task) []string {
    locks := make([]string, 0)

    // 根据任务类型获取不同级别的锁
    switch t.Type() {
    case commonpb.MsgType_CreateCollection:
        // Collection 级锁
        key := fmt.Sprintf("collection_%s", t.GetCollectionName())
        s.collectionLock.Lock(key)
        locks = append(locks, key)

    case commonpb.MsgType_DropDatabase:
        // Database 级锁
        key := fmt.Sprintf("database_%s", t.GetDatabaseName())
        s.databaseLock.Lock(key)
        locks = append(locks, key)

    case commonpb.MsgType_CreateCredential:
        // 集群级锁
        s.clusterLock.Lock("rbac")
        locks = append(locks, "rbac")
    }

    return locks
}

关键设计点

  1. 时间戳排序:使用最小堆按TSO时间戳对任务排序,保证了DDL操作的全局顺序性。
  2. 分层锁机制:设计Cluster > Database > Collection三级锁,有效减少锁冲突范围。
  3. 异步执行:任务执行在独立的goroutine中进行,提高了系统的整体并发度。
  4. 优雅降级:单个任务的失败不会阻塞或影响其他任务的正常执行。

🔍 源码深度剖析

1. DDL 操作完整链路 - 以 CreateCollection 为例

跟踪一个完整的CreateCollection请求的调用链路。

Step 1: Proxy 接收请求
// internal/proxy/impl.go
func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
    // 1. 参数验证
    if err := validateCollectionName(request.CollectionName); err != nil {
        return &commonpb.Status{ErrorCode: commonpb.ErrorCode_IllegalArgument}, err
    }

    // 2. 转发到 RootCoord
    return node.rootCoord.CreateCollection(ctx, request)
}
Step 2: RootCoord 创建任务
// internal/rootcoord/root_coord.go
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
    // 1. 创建任务对象
    t := &createCollectionTask{
        baseTask: baseTask{
            ctx:  ctx,
            core: c,
            done: make(chan error, 1),
        },
        Req: in,
    }

    // 2. 提交到调度器
    if err := c.scheduler.AddTask(t); err != nil {
        return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, err
    }

    // 3. 等待任务完成
    if err := t.WaitToFinish(); err != nil {
        return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, err
    }

    return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
Step 3: 任务准备阶段
// internal/rootcoord/create_collection_task.go
func (t *createCollectionTask) Prepare(ctx context.Context) error {
    // 1. 分配 Collection ID
    collID, err := t.core.idAllocator.AllocOne()
    if err != nil {
        return err
    }
    t.collectionID = collID

    // 2. 解析 Schema
    schema, err := parseSchema(t.Req.Schema)
    if err != nil {
        return err
    }
    t.schema = schema

    // 3. 分配 Partition ID(默认分区)
    partitionID, err := t.core.idAllocator.AllocOne()
    if err != nil {
        return err
    }
    t.partitionID = partitionID

    // 4. 分配 VChannel
    channels, err := t.core.channelManager.AllocChannels(ctx, t.collectionID)
    if err != nil {
        return err
    }
    t.channels = channels

    return nil
}
Step 4: 任务执行阶段
func (t *createCollectionTask) Execute(ctx context.Context) error {
    // 1. 构建 Collection 对象
    coll := &model.Collection{
        CollectionID:   t.collectionID,
        Name:           t.Req.CollectionName,
        DBName:         t.Req.DbName,
        Schema:         t.schema,
        CreateTime:     t.GetTs(),
        State:          pb.CollectionState_CollectionCreating,
        VirtualChannels: t.channels,
        Partitions: []*model.Partition{
            {
                PartitionID:   t.partitionID,
                PartitionName: "_default",
                State:         pb.PartitionState_PartitionCreated,
            },
        },
    }

    // 2. 添加到 MetaTable
    if err := t.core.meta.AddCollection(ctx, coll); err != nil {
        return err
    }

    // 3. 通知 DataCoord 创建 VChannel
    if err := t.core.broker.WatchChannels(ctx, &datapb.WatchChannelsRequest{
        CollectionID: t.collectionID,
        Channels:     t.channels,
    }); err != nil {
        // 回滚:删除元数据
        t.core.meta.RemoveCollection(ctx, t.collectionID, t.GetTs())
        return err
    }

    // 4. 更新 Collection 状态为 Created
    coll.State = pb.CollectionState_CollectionCreated
    if err := t.core.meta.AlterCollection(ctx, coll, t.GetTs()); err != nil {
        return err
    }

    log.Info("collection created successfully",
        zap.Int64("collectionID", t.collectionID),
        zap.String("collectionName", t.Req.CollectionName))

    return nil
}

完整时序图

Client          Proxy         RootCoord       Scheduler       MetaTable       DataCoord
 |               |               |               |               |               |
 |─CreateColl──>│               |               |               |               |
 |               │─CreateColl──>│               |               |               |
 |               |               │─CreateTask──>│               |               |
 |               |               |               |               |               |
 |               |               │<─AddTask─────│               |               |
 |               |               |               |               |               |
 |               |               |               │─AllocID────>│               |
 |               |               |               │<─CollID──────│               |
 |               |               |               |               |               |
 |               |               |               │─Execute────>│               |
 |               |               |               |               │─AddColl────>│
 |               |               |               |               │<─OK──────────│
 |               |               |               |               |               |
 |               |               |               │─WatchChan──────────────────>│
 |               |               |               │<─OK──────────────────────────│
 |               |               |               |               |               |
 |               |               │<─Done─────────│               |               |
 |               │<─Success──────│               |               |               |
 │<─Success──────│               |               |               |               |

2. TSO 时间戳分配机制

TSO(Timestamp Oracle)是Milvus实现分布式一致性与操作顺序性的关键组件。

// internal/tso/global_allocator.go
type GlobalAllocator struct {
    mu sync.Mutex

    // TSO 组件
    tso         uint64        // 当前时间戳
    lastSavedTime time.Time   // 上次保存的物理时间

    // etcd 存储
    rootPath    string
    txn         kv.TxnKV

    // 配置
    updateInterval time.Duration  // 更新间隔
}

// 生成 TSO
func (ga *GlobalAllocator) GenerateTSO(count uint32) (uint64, error) {
    ga.mu.Lock()
    defer ga.mu.Unlock()

    // 1. 获取当前物理时间
    physical := time.Now().UnixMilli()

    // 2. 检查时钟回退
    if physical < ga.lastSavedTime.UnixMilli() {
        return 0, errors.New("clock moved backwards")
    }

    // 3. 生成时间戳
    // TSO 格式:[物理时间 46 位][逻辑时间 18 位]
    ts := uint64(physical) << 18

    // 4. 增加逻辑时间
    ga.tso += uint64(count)
    ts |= ga.tso & 0x3FFFF // 取低 18 位

    // 5. 定期持久化到 etcd
    if time.Since(ga.lastSavedTime) > ga.updateInterval {
        if err := ga.saveTimestamp(physical); err != nil {
            return 0, err
        }
        ga.lastSavedTime = time.Now()
    }

    return ts, nil
}

TSO 格式解析

┌─────────────────────────────────────────────────────────────┐
│                    64 位 TSO 时间戳                          │
├──────────────────────────────────┬──────────────────────────┤
│      物理时间(46 位)            │   逻辑时间(18 位)       │
│   Unix 毫秒时间戳                 │   单调递增计数器          │
└──────────────────────────────────┴──────────────────────────┘
示例:
物理时间:1701878400000 (2023-12-06 20:00:00)
逻辑时间:12345
TSO = (1701878400000 << 18) | 12345
    = 446402150400000000 | 12345
    = 446402150400012345

TSO 的作用

  1. 全局顺序:保证所有DDL操作有一个全局一致的顺序。
  2. MVCC:支持基于时间戳的多版本并发控制。
  3. 时间旅行:支持查询历史某个时刻的数据状态。
  4. 分布式事务:作为分布式事务的时间戳标识。

3. DDL Callback 机制详解

DDL Callback是RootCoord将DDL操作逻辑解耦和执行的核心机制。

// internal/rootcoord/ddl_callbacks.go
// DDL Callback 接口
type DDLCallback interface {
    // 执行 DDL 操作
    Execute(ctx context.Context) error

    // 回滚操作
    Rollback(ctx context.Context) error

    // 获取操作类型
    Type() string
}

// 基础 Callback
type baseCallback struct {
    core *Core
    ts   Timestamp
}

CreateCollection Callback 实现

// internal/rootcoord/ddl_callbacks_create_collection.go
type createCollectionCallback struct {
    baseCallback

    req          *milvuspb.CreateCollectionRequest
    collectionID UniqueID
    partitionID  UniqueID
    channels     []string
}

func (c *createCollectionCallback) Execute(ctx context.Context) error {
    // 1. 构建 Collection 模型
    coll := &model.Collection{
        CollectionID: c.collectionID,
        Name:         c.req.CollectionName,
        DBName:       c.req.DbName,
        // ... 其他字段
    }

    // 2. 添加到元数据表
    if err := c.core.meta.AddCollection(ctx, coll); err != nil {
        return err
    }

    // 3. 通知 DataCoord
    if err := c.core.broker.WatchChannels(ctx, &datapb.WatchChannelsRequest{
        CollectionID: c.collectionID,
        Channels:     c.channels,
    }); err != nil {
        // 失败则回滚
        c.Rollback(ctx)
        return err
    }

    // 4. 发送 DDL 事件
    c.core.sendDDLEvent(ctx, &DDLEvent{
        Type:         DDLEventTypeCreateCollection,
        CollectionID: c.collectionID,
        Timestamp:    c.ts,
    })

    return nil
}

func (c *createCollectionCallback) Rollback(ctx context.Context) error {
    // 回滚:删除元数据
    return c.core.meta.RemoveCollection(ctx, c.collectionID, c.ts)
}

Callback 执行流程

┌─────────────────────────────────────────────┐
│           Task.Execute()                    │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      创建 Callback 对象                     │
│  - createCollectionCallback                 │
│  - dropCollectionCallback                   │
│  - alterCollectionCallback                  │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      Callback.Execute()                     │
│  1. 更新元数据                              │
│  2. 通知其他组件                            │
│  3. 发送 DDL 事件                          │
└──────────────────┬──────────────────────────┘
                   │
                   ├─Success──> 返回成功
                   │
                   └─Failed───> Callback.Rollback()
                                   │
                                   ▼
                             ┌──────────────┐
                             │   回滚操作   │
                             │   恢复状态   │
                             └──────────────┘

4. 分布式锁实现

RootCoord采用三级锁机制来保证DDL操作的并发安全与数据一致性。

// pkg/util/lock/key_lock.go
type KeyLock[K comparable] struct {
    mu    sync.Mutex
    locks map[K]*sync.Mutex
}

func NewKeyLock[K comparable]() *KeyLock[K] {
    return &KeyLock[K]{
        locks: make(map[K]*sync.Mutex),
    }
}

func (kl *KeyLock[K]) Lock(key K) {
    kl.mu.Lock()

    // 获取或创建锁
    lock, ok := kl.locks[key]
    if !ok {
        lock = &sync.Mutex{}
        kl.locks[key] = lock
    }

    kl.mu.Unlock()

    // 加锁
    lock.Lock()
}

func (kl *KeyLock[K]) Unlock(key K) {
    kl.mu.Lock()
    defer kl.mu.Unlock()

    if lock, ok := kl.locks[key]; ok {
        lock.Unlock()
    }
}

锁的层级关系

// internal/rootcoord/scheduler.go
type LockLevel int
const (
    ClusterLock    LockLevel = iota // 集群级锁(RBAC 操作)
    DatabaseLock                    // Database 级锁
    CollectionLock                  // Collection 级锁
)

// 获取任务所需的锁
func (s *scheduler) getLockKeys(t task) map[LockLevel][]string {
    locks := make(map[LockLevel][]string)

    switch t.Type() {
    case commonpb.MsgType_CreateCollection:
        // Collection 级锁
        locks[CollectionLock] = []string{
            fmt.Sprintf("%s.%s", t.GetDatabaseName(), t.GetCollectionName()),
        }

    case commonpb.MsgType_DropDatabase:
        // Database 级锁(锁定整个 Database)
        locks[DatabaseLock] = []string{t.GetDatabaseName()}

    case commonpb.MsgType_CreateCredential:
        // 集群级锁(全局 RBAC 操作)
        locks[ClusterLock] = []string{"rbac"}
    }

    return locks
}

锁冲突矩阵

操作类型               锁级别          冲突操作
─────────────────────────────────────────────────
CreateCollection     Collection      同名 Collection 的所有操作
DropCollection       Collection      该 Collection 的所有操作
AlterCollection      Collection      该 Collection 的所有操作
CreatePartition      Collection      该 Collection 的 DDL 操作
CreateDatabase       Database        同名 Database 的所有操作
DropDatabase         Database        该 Database 的所有操作
AlterDatabase        Database        该 Database 的 DDL 操作
CreateCredential     Cluster         所有 RBAC 操作
CreateRole           Cluster         所有 RBAC 操作
GrantPrivilege       Cluster         所有 RBAC 操作

🛠️ 调试与实验

1. 如何调试 RootCoord

方法 1:使用 Delve 调试器
# 1. 安装 Delve
go install github.com/go-delve/delve/cmd/dlv@latest

# 2. 启动 RootCoord(调试模式)
cd milvus
dlv debug ./cmd/roles --headless --listen=:2345 --api-version=2 -- \
    --run-with-subprocess --server-type root_coord

# 3. 在 IDE 中连接调试器
# VS Code: 配置 launch.json
# GoLand: Run -> Attach to Process
方法 2:添加日志
// 在关键位置添加日志
log.Info("CreateCollection started",
    zap.String("collectionName", req.CollectionName),
    zap.Int64("collectionID", collID),
    zap.Uint64("timestamp", ts))

// 使用 zap 的结构化日志
log.Debug("task details",
    zap.Any("task", t),
    zap.Stack("stack"))
方法 3:使用 pprof 性能分析
import _ "net/http/pprof"

// 在 RootCoord 启动时开启 pprof
go func() {
    http.ListenAndServe("localhost:6060", nil)
}()
# 查看 CPU profile
go tool pprof http://localhost:6060/debug/pprof/profile

# 查看内存 profile
go tool pprof http://localhost:6060/debug/pprof/heap

# 查看 goroutine
go tool pprof http://localhost:6060/debug/pprof/goroutine

2. 实验:跟踪 CreateCollection 流程

通过一个简单的Python实验来观察CreateCollection的完整流程:

# experiment_create_collection.py
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import logging

# 开启详细日志
logging.basicConfig(level=logging.DEBUG)

# 1. 连接 Milvus
connections.connect(
    alias="default",
    host="localhost",
    port="19530")

# 2. 定义 Schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)]
schema = CollectionSchema(fields=fields, description="test collection")

# 3. 创建 Collection(观察日志)
print("Creating collection...")
collection = Collection(name="test_collection", schema=schema)
print(f"Collection created: {collection.name}")
print(f"Collection ID: {collection._collection_id}")

观察 RootCoord 日志

# 查看 RootCoord 日志
kubectl logs -f milvus-rootcoord-xxx -n milvus

# 关键日志输出:
[INFO] [rootcoord/root_coord.go:xxx] CreateCollection started
    collectionName: test_collection
    dbName: default
[INFO] [rootcoord/scheduler.go:xxx] task added to scheduler
    taskID: 12345
    taskType: CreateCollection
    timestamp: 446402150400012345
[INFO] [rootcoord/create_collection_task.go:xxx] task prepare started
    collectionID: 446402150400012346
[INFO] [rootcoord/meta_table.go:xxx] collection added to meta
    collectionID: 446402150400012346
    collectionName: test_collection
[INFO] [rootcoord/create_collection_task.go:xxx] collection created successfully
    collectionID: 446402150400012346

3. 常见问题排查

问题 1:Collection 创建失败

现象

Error: collection already exists

排查步骤

# 1. 检查 etcd 中的元数据
etcdctl get --prefix /by-dev/meta/root-coord/collection

# 2. 检查 RootCoord 内存缓存
# 通过 Birdwatcher 工具
birdwatcher> show collection-info --collection test_collection

# 3. 检查是否有残留的 tombstone
etcdctl get --prefix /by-dev/meta/root-coord/collection-tombstone

解决方案

// 如果是缓存不一致,重启 RootCoord
// 或者使用 Birdwatcher 清理

// 代码层面:添加重试逻辑
func (c *Core) CreateCollection(ctx context.Context, req *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
    // 检查是否已存在
    if coll, _ := c.meta.GetCollectionByName(ctx, req.DbName, req.CollectionName, 0); coll != nil {
        // 如果 Schema 相同,返回成功(幂等性)
        if isSameSchema(coll.Schema, req.Schema) {
            return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
        }
        return nil, errors.New("collection already exists with different schema")
    }

    // 继续创建流程...
}
问题 2:TSO 时钟回退

现象

Error: clock moved backwards

原因分析

  • 系统时间被手动调整
  • NTP同步导致时间跳变
  • 虚拟机时间不同步

解决方案

// internal/tso/global_allocator.go
func (ga *GlobalAllocator) GenerateTSO(count uint32) (uint64, error) {
    ga.mu.Lock()
    defer ga.mu.Unlock()

    physical := time.Now().UnixMilli()

    // 检测时钟回退
    if physical < ga.lastSavedTime.UnixMilli() {
        // 等待时间追上
        waitTime := ga.lastSavedTime.Sub(time.Now())
        if waitTime > time.Second {
            // 回退超过 1 秒,报错
            return 0, errors.New("clock moved backwards too much")
        }

        // 短暂等待
        time.Sleep(waitTime)
        physical = time.Now().UnixMilli()
    }

    // 继续生成 TSO...
}
问题 3:DDL 操作卡住

现象

CreateCollection 请求一直不返回

排查步骤

# 1. 检查 Scheduler 状态
curl http://localhost:9091/metrics | grep rootcoord_ddl

# 2. 检查是否有死锁
# 查看 goroutine 堆栈
curl http://localhost:6060/debug/pprof/goroutine?debug=2

# 3. 检查任务队列
# 通过 Birdwatcher
birdwatcher> show ddl-queue

解决方案

// 添加超时机制
func (t *createCollectionTask) Execute(ctx context.Context) error {
    // 设置超时
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    // 执行操作
    if err := t.core.meta.AddCollection(ctx, coll); err != nil {
        return err
    }

    // 通知 DataCoord(带超时)
    done := make(chan error, 1)
    go func() {
        done <- t.core.broker.WatchChannels(ctx, req)
    }()

    select {
    case err := <-done:
        return err
    case <-ctx.Done():
        return errors.New("operation timeout")
    }
}

📚 扩展阅读

1. 相关设计文档

  • RootCoord Design Doc
  • TSO Design
  • DDL Execution

2. 相关论文

  • Percolator: Large-scale Incremental Processing Using Distributed Transactions and Notifications - Google的分布式事务系统,TSO设计的灵感来源之一。
  • Spanner: Google's Globally-Distributed Database - 关于TrueTime和全局时间戳实现的经典论文。
  • etcd: Distributed reliable key-value store - 深入了解Raft一致性算法和分布式锁的实现。

3. 源码阅读建议

推荐阅读顺序

  1. 入门(1-2 天)
    • root_coord.go - 理解整体结构和服务生命周期。
    • scheduler.go - 理解任务调度的核心逻辑。
    • meta_table.go - 理解元数据在内存中的管理方式。
  2. 进阶(3-5 天)
    • create_collection_task.go - 深入理解一个典型DDL任务的完整实现。
    • ddl_callbacks_*.go - 理解解耦业务逻辑的Callback机制。
    • internal/tso/ - 深入理解全局时间戳服务的实现细节。
  3. 高级(1 周)
    • internal/metastore/ - 理解元数据如何持久化到etcd。
    • internal/allocator/ - 理解全局唯一ID的分配算法。
    • internal/util/sessionutil/ - 理解服务注册与发现机制。

调试技巧

// 1. 在关键路径添加断点
// root_coord.go:CreateCollection
// scheduler.go:AddTask
// meta_table.go:AddCollection

// 2. 使用条件断点
// 只在特定 Collection 时触发
if req.CollectionName == "test_collection" {
    // 断点位置
}

// 3. 观察变量
// - t.collectionID
// - t.ts
// - coll.State

4. 贡献代码指南

如果你想为RootCoord模块贡献代码,可以从以下方向入手:

常见贡献方向

  1. 性能优化
    • 优化元数据查询性能,减少锁竞争。
    • 改进任务调度算法,提高吞吐量。
  2. 功能增强
    • 支持更多种类的DDL操作。
    • 增强RBAC权限管理的细粒度。
    • 改进错误处理与回滚机制。
  3. 测试完善
    • 为关键函数添加单元测试。
    • 编写集成测试,覆盖复杂场景。
    • 添加性能基准测试。

提交 PR 流程

# 1. Fork 仓库
git clone https://github.com/your-username/milvus.git

# 2. 创建分支
git checkout -b feature/improve-rootcoord

# 3. 修改代码
# 编辑 internal/rootcoord/xxx.go

# 4. 运行测试
make test

# 5. 提交代码
git commit -m "feat: improve RootCoord performance"
git push origin feature/improve-rootcoord

# 6. 创建 Pull Request
# 在 GitHub 上创建 PR

💡 总结

核心要点回顾

  1. RootCoord 是 Milvus 的元数据管理中心:负责管理所有Database、Collection、Partition的元数据,并协调所有DDL操作。
  2. 三大核心组件
    • MetaTable:元数据管理,采用双层索引与MVCC机制。
    • Scheduler:任务调度,基于时间戳排序和分层锁保证顺序与安全。
    • TSO Allocator:提供全局递增的时间戳,是分布式系统一致性的基石。
  3. DDL 执行流程:Proxy接收请求 → RootCoord创建任务 → Scheduler调度执行 → Callback执行具体操作 → 更新元数据并通知其他组件。
  4. 关键设计模式
    • 任务模式:统一的任务接口与执行框架。
    • Callback 模式:将DDL操作的业务逻辑解耦,便于扩展和维护。
    • 分层锁:减少锁冲突,提高并发度。
    • MVCC:支持多版本并发控制与时间旅行查询。

最佳实践建议

  1. 元数据设计
    • 使用双层索引加速高频查询。
    • 合理利用内存缓存,减少对底层存储(如etcd)的访问。
    • 建立定期清理机制,移除过期的逻辑删除记录(tombstone)。
  2. 任务调度
    • 根据业务场景合理设置任务优先级。
    • 避免在任务执行中长时间持有锁。
    • 为所有关键操作实现超时与重试机制。
  3. 错误处理
    • 实现完善的回滚(Rollback)机制,保证操作原子性。
    • 对可恢复错误(如网络波动)添加自动重试逻辑。
    • 记录详细、结构化的错误日志,便于快速定位问题。
  4. 性能优化
    • 对批量操作进行合并,减少RPC调用次数。
    • 将非关键路径的操作(如发送通知)异步化。
    • 使用连接池复用与其他组件的网络连接。



上一篇:Qt实现无人机实时轨迹平滑绘制与地图交互
下一篇:嵌入式Linux驱动开发详解:字符设备驱动从原理到实践
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-17 01:35 , Processed in 0.111027 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表