找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1107

积分

0

好友

159

主题
发表于 前天 16:20 | 查看: 4| 回复: 0

📋 概述

DataCoord 的定位

DataCoord(Data Coordinator)是 Milvus 的“数据管家”,负责整个集群的数据生命周期管理。它的核心职责包括:

  1. Segment 管理:分配、封存、合并 Segment
  2. Channel 管理:管理 DML Channel 的分配和负载均衡
  3. Compaction 调度:触发和调度数据压缩任务
  4. Index 构建:调度索引构建任务
  5. 垃圾回收:清理过期的数据和元数据

核心功能列表

DataCoord 核心功能
├── Segment 生命周期管理
│   ├── Segment 分配(Growing)
│   ├── Segment 封存(Sealed)
│   ├── Segment 刷盘(Flushed)
│   └── Segment 删除(Dropped)
├── Compaction 压缩
│   ├── L0 Compaction(Delta 合并)
│   ├── Mix Compaction(小文件合并)
│   ├── Clustering Compaction(聚簇优化)
│   └── Compaction 任务调度
├── Index 索引管理
│   ├── Index 构建任务分配
│   ├── Index 进度跟踪
│   └── Index 版本管理
├── Channel 管理
│   ├── Channel 分配
│   ├── Channel 负载均衡
│   └── Channel 故障恢复
└── 垃圾回收
    ├── Segment GC
    ├── Binlog GC
    └── Index GC

与其他组件的关系

┌─────────────────────────────────────────────┐
│              RootCoord                       │
│        (通知 Collection 创建)                │
└──────────────────┬──────────────────────────┘
                   │ DDL 事件
                   ▼
┌─────────────────────────────────────────────┐
│              DataCoord                       │
│  ┌─────────────────────────────────────┐    │
│  │    SegmentManager (Segment 管理)    │    │
│  └─────────────────────────────────────┘    │
│  ┌─────────────────────────────────────┐    │
│  │  CompactionTrigger (压缩触发器)     │    │
│  └─────────────────────────────────────┘    │
│  ┌─────────────────────────────────────┐    │
│  │    IndexService (索引服务)          │    │
│  └─────────────────────────────────────┘    │
│  ┌─────────────────────────────────────┐    │
│  │  GarbageCollector (垃圾回收)        │    │
│  └─────────────────────────────────────┘    │
└──────────┬───────────┬──────────┬───────────┘
           │           │          │
           │ 分配任务  │ 监控     │ 清理
           ▼           ▼          ▼
    ┌──────────┐ ┌──────────┐ ┌──────────┐
    │DataNode  │ │QueryNode │ │ MinIO    │
    │(执行任务)│ │(查询数据)│ │(存储)    │
    └──────────┘ └──────────┘ └──────────┘

🗂️ 源码结构

目录结构

internal/datacoord/
├── server.go                      # 主服务实现
├── meta.go                        # 元数据管理
├── segment_manager.go             # Segment 管理器
├── channel.go                     # Channel 管理
│
# Segment 相关
├── segment_info.go                # Segment 信息
├── segment_operator.go            # Segment 操作
├── segment_allocation_policy.go   # Segment 分配策略
│
# Compaction 相关
├── compaction_trigger.go          # Compaction 触发器
├── compaction_trigger_v2.go       # 新版触发器
├── compaction_inspector.go        # Compaction 检查器
├── compaction_policy_*.go         # Compaction 策略
├── compaction_task_*.go           # Compaction 任务
├── compaction_queue.go            # Compaction 队列
├── compaction_view.go             # Compaction 视图
│
# Index 相关
├── index_service.go               # 索引服务
├── index_meta.go                  # 索引元数据
├── index_inspector.go             # 索引检查器
├── index_engine_version_manager.go # 索引引擎版本管理
│
# Import 相关
├── import_job.go                  # 导入任务
├── import_meta.go                 # 导入元数据
├── import_task_*.go               # 导入任务实现
├── import_inspector.go            # 导入检查器
├── import_checker.go              # 导入检查器
│
# 其他
├── garbage_collector.go           # 垃圾回收
├── handler.go                     # 请求处理器
├── services.go                    # gRPC 服务实现
├── allocator/                     # ID 分配器
├── broker/                        # 与其他组件通信
├── session/                       # 会话管理
└── task/                          # 任务系统

核心数据结构

让我们先看看 DataCoord 的核心结构体:

// internal/datacoord/server.go
// Server implements `types.DataCoord`
type Server struct {
    ctx              context.Context
    serverLoopCtx    context.Context
    serverLoopCancel context.CancelFunc
    serverLoopWg     sync.WaitGroup
    quitCh           chan struct{}
    stateCode        atomic.Value

    // 客户端连接
    etcdCli        *clientv3.Client      // etcd 客户端
    tikvCli        *txnkv.Client         // TiKV 客户端(可选)

    // 核心组件
    meta             *meta                // 元数据管理
    segmentManager   Manager              // Segment 管理器
    allocator        allocator.Allocator  // ID 分配器
    idAllocator      *GlobalIDAllocator   // 全局 ID 分配器

    // 节点管理
    nodeManager      session.NodeManager  // DataNode 管理
    cluster          session.Cluster      // 集群管理

    // Compaction 相关
    compactionTrigger        trigger              // Compaction 触发器
    compactionInspector      CompactionInspector  // Compaction 检查器
    compactionTriggerManager TriggerManager       // Compaction 触发管理器

    // Index 相关
    indexService     *indexService        // 索引服务
    indexMeta        *indexMeta           // 索引元数据

    // Import 相关
    importMeta       ImportMeta           // 导入元数据
    importInspector  ImportInspector      // 导入检查器
    importChecker    ImportChecker        // 导入检查器

    // 垃圾回收
    garbageCollector *garbageCollector    // 垃圾回收器

    // 通道
    flushCh          chan UniqueID        // 刷盘通道
    notifyIndexChan  chan UniqueID        // 索引通知通道

    // 会话
    session          sessionutil.SessionInterface  // 服务会话
    dnSessionWatcher sessionutil.SessionWatcher    // DataNode 会话监听
    qnSessionWatcher sessionutil.SessionWatcher    // QueryNode 会话监听
}

关键字段说明

  1. meta:管理所有 Segment、Channel、Index 的元数据
  2. segmentManager:负责 Segment 的分配、封存、删除
  3. compactionTrigger:定期触发 Compaction 任务
  4. indexService:管理索引构建任务
  5. garbageCollector:清理过期的数据和元数据

🚀 核心流程解析

1. DataCoord 启动流程

// internal/datacoord/server.go
func (s *Server) Init() error {
    // 1. 初始化 etcd 客户端
    s.initEtcd()

    // 2. 初始化会话(注册到 etcd)
    s.initSession()

    // 3. 初始化 ID 分配器
    s.initIDAllocator()

    // 4. 初始化元数据
    s.initMeta()

    // 5. 初始化 Segment 管理器
    s.initSegmentManager()

    // 6. 初始化 Compaction 组件
    s.initCompaction()

    // 7. 初始化 Index 服务
    s.initIndexService()

    // 8. 初始化垃圾回收器
    s.initGarbageCollector()

    // 9. 初始化节点管理器
    s.initNodeManager()

    return nil
}

func (s *Server) Start() error {
    // 1. 启动节点管理器(监听 DataNode)
    s.nodeManager.Start()

    // 2. 启动 Segment 管理器
    s.segmentManager.Start()

    // 3. 启动 Compaction 触发器
    s.compactionTrigger.start()

    // 4. 启动 Index 服务
    s.indexService.Start()

    // 5. 启动垃圾回收器
    s.garbageCollector.start()

    // 6. 启动后台任务
    s.startServerLoop()

    // 7. 更新服务状态
    s.UpdateStateCode(commonpb.StateCode_Healthy)

    return nil
}

启动流程图

┌─────────────────────────────────────────────┐
│            DataCoord.Init()                 │
└──────────────────┬──────────────────────────┘
                   │
    ┌──────────────┼──────────────┐
    │              │              │
    ▼              ▼              ▼
┌────────┐   ┌─────────┐   ┌──────────┐
│ etcd   │   │ Session │   │ ID/Meta  │
│ Client │   │ Register│   │Allocator │
└────────┘   └─────────┘   └──────────┘
                   │
                   ▼
        ┌─────────────────┐
        │  SegmentManager │
        │  (初始化)        │
        └─────────────────┘
                   │
                   ▼
        ┌─────────────────┐
        │  Compaction     │
        │  (初始化触发器)  │
        └─────────────────┘
                   │
                   ▼
        ┌─────────────────┐
        │  IndexService   │
        │  (初始化)        │
        └─────────────────┘
                   │
                   ▼
        ┌─────────────────┐
        │  DataCoord      │
        │   Ready!        │
        └─────────────────┘

2. Segment 管理核心实现

SegmentManager 是 DataCoord 最核心的组件,负责 Segment 的整个生命周期:

// internal/datacoord/segment_manager.go
type SegmentManager struct {
    meta      *meta                // 元数据
    allocator allocator.Allocator  // ID 分配器
    helper    allocHelper          // 辅助工具

    // 并发控制
    channelLock     *lock.KeyLock[string]                     // Channel 级锁
    channel2Growing *typeutil.ConcurrentMap[string, UniqueSet] // Channel -> Growing Segments
    channel2Sealed  *typeutil.ConcurrentMap[string, UniqueSet] // Channel -> Sealed Segments

    // 策略
    estimatePolicy      calUpperLimitPolicy  // 容量估算策略
    allocPolicy         AllocatePolicy       // 分配策略
    segmentSealPolicies []SegmentSealPolicy  // 封存策略
    channelSealPolicies []channelSealPolicy  // Channel 封存策略
    flushPolicy         flushPolicy          // 刷盘策略
}

Segment 分配流程

// internal/datacoord/segment_manager.go
func (s *SegmentManager) AllocSegment(
    ctx context.Context,
    collectionID, partitionID UniqueID,
    channelName string,
    requestRows int64,
    storageVersion int64,
) ([]*Allocation, error) {
    // 1. 获取 Channel 锁
    s.channelLock.Lock(channelName)
    defer s.channelLock.Unlock(channelName)

    // 2. 获取或创建 Growing Segment
    segment, err := s.getOrCreateGrowingSegment(ctx, collectionID, partitionID, channelName, storageVersion)
    if err != nil {
        return nil, err
    }

    // 3. 检查 Segment 容量
    if !s.canAllocate(segment, requestRows) {
        // 封存当前 Segment
        if err := s.sealSegment(ctx, segment.GetID()); err != nil {
            return nil, err
        }

        // 创建新的 Growing Segment
        segment, err = s.createNewSegment(ctx, collectionID, partitionID, channelName, storageVersion)
        if err != nil {
            return nil, err
        }
    }

    // 4. 分配空间
    allocation := &Allocation{
        SegmentID:  segment.GetID(),
        NumOfRows:  requestRows,
        ExpireTime: s.calculateExpireTime(),
    }

    // 5. 更新 Segment 元数据
    segment.NumOfRows += requestRows
    if err := s.meta.UpdateSegment(segment); err != nil {
        return nil, err
    }

    return []*Allocation{allocation}, nil
}

Segment 状态转换

┌─────────────────────────────────────────────┐
│          Segment 生命周期                    │
└─────────────────────────────────────────────┘
    ┌──────────┐
    │  New     │ 创建新 Segment
    └────┬─────┘
         │ AllocSegment()
         ▼
    ┌──────────┐
    │ Growing  │ 正在写入数据
    └────┬─────┘
         │ 达到容量/超时
         │ SealSegment()
         ▼
    ┌──────────┐
    │ Sealed   │ 已封存,等待刷盘
    └────┬─────┘
         │ FlushSegment()
         ▼
    ┌──────────┐
    │ Flushed  │ 已刷盘到对象存储
    └────┬─────┘
         │ Compaction
         ▼
    ┌──────────┐
    │Compacted │ 已压缩
    └────┬─────┘
         │ GC
         ▼
    ┌──────────┐
    │ Dropped  │ 已删除
    └──────────┘

Segment 封存策略

// internal/datacoord/segment_allocation_policy.go
// Segment 封存策略接口
type SegmentSealPolicy interface {
    ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string)
}

// 1. 容量策略:Segment 达到最大行数
type sealByCapacityPolicy struct {
    maxRowNum int64
}

func (p *sealByCapacityPolicy) ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string) {
    if segment.NumOfRows >= p.maxRowNum {
        return true, fmt.Sprintf("segment %d reaches max row num %d", segment.ID, p.maxRowNum)
    }
    return false, ""
}

// 2. 生命周期策略:Segment 存在时间过长
type sealByLifetimePolicy struct {
    lifetime time.Duration
}

func (p *sealByLifetimePolicy) ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string) {
    createTime := tsoutil.PhysicalTime(segment.CreateTime)
    currentTime := tsoutil.PhysicalTime(ts)

    if currentTime.Sub(createTime) > p.lifetime {
        return true, fmt.Sprintf("segment %d exceeds lifetime %v", segment.ID, p.lifetime)
    }
    return false, ""
}

// 3. 空闲策略:Segment 长时间没有写入
type sealByIdlePolicy struct {
    idleTime time.Duration
}

func (p *sealByIdlePolicy) ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string) {
    lastModifyTime := tsoutil.PhysicalTime(segment.LastModifyTime)
    currentTime := tsoutil.PhysicalTime(ts)

    if currentTime.Sub(lastModifyTime) > p.idleTime {
        return true, fmt.Sprintf("segment %d is idle for %v", segment.ID, p.idleTime)
    }
    return false, ""
}

3. Compaction 机制深度解析

Compaction 是 DataCoord 的核心功能之一,用于优化数据存储和查询性能:

Compaction 类型
// pkg/proto/datapb/data_coord.proto
enum CompactionType {
    UndefinedCompaction = 0;
    MixCompaction = 1;        // 混合压缩:合并小 Segment
    L0Compaction = 2;         // L0 压缩:合并 Delta 数据
    ClusteringCompaction = 3; // 聚簇压缩:按主键排序
}

Compaction 类型对比

┌─────────────────────────────────────────────────────────────┐
│                  Compaction 类型对比                         │
├──────────────┬──────────────┬──────────────┬────────────────┤
│ 类型         │ 目的         │ 触发条件     │ 效果           │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ Mix          │ 合并小文件   │ 小 Segment   │ 减少文件数     │
│ Compaction   │ 减少碎片     │ 数量过多     │ 提升查询性能   │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ L0           │ 合并 Delta   │ Delete 操作  │ 清理删除数据   │
│ Compaction   │ 应用删除     │ 累积过多     │ 减少存储空间   │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ Clustering   │ 数据聚簇     │ 手动触发     │ 优化范围查询   │
│ Compaction   │ 主键排序     │ 或定期执行   │ 提升过滤性能   │
└──────────────┴──────────────┴──────────────┴────────────────┘
Compaction 触发器实现
// internal/datacoord/compaction_trigger.go
type compactionTrigger struct {
    handler       Handler
    meta          *meta
    allocator     allocator.Allocator

    // 信号通道
    signals       chan *compactionSignal  // 自动触发信号
    manualSignals chan *compactionSignal  // 手动触发信号

    // 检查器
    inspector     CompactionInspector

    // 定时器
    globalTrigger *time.Ticker

    // 策略
    estimateNonDiskSegmentPolicy calUpperLimitPolicy
    estimateDiskSegmentPolicy    calUpperLimitPolicy
}

// 启动 Compaction 触发器
func (t *compactionTrigger) start() {
    // 1. 启动全局触发器(定期检查)
    t.globalTrigger = time.NewTicker(Params.DataCoordCfg.CompactionCheckInterval.GetAsDuration(time.Second))

    // 2. 启动信号处理循环
    go t.startSignalLoop()

    // 3. 启动全局触发循环
    go t.startGlobalTriggerLoop()
}

// 全局触发循环
func (t *compactionTrigger) startGlobalTriggerLoop() {
    for {
        select {
        case <-t.closeCh:
            return
        case <-t.globalTrigger.C:
            // 检查所有 Collection 的 Compaction 需求
            t.triggerCompactionForAllCollections()
        }
    }
}

// 为所有 Collection 触发 Compaction
func (t *compactionTrigger) triggerCompactionForAllCollections() {
    collections := t.meta.GetCollections()

    for _, collectionID := range collections {
        // 获取 Collection 的所有 Channel
        channels := t.meta.GetChannelsByCollection(collectionID)

        for _, channel := range channels {
            // 检查是否需要 Compaction
            if t.shouldTriggerCompaction(collectionID, channel) {
                // 创建 Compaction 信号
                signal := NewCompactionSignal().
                    WithCollectionID(collectionID).
                    WithChannel(channel).
                    WithWaitResult(false)

                // 发送信号
                select {
                case t.signals <- signal:
                case <-t.closeCh:
                    return
                }
            }
        }
    }
}
Compaction 任务执行
// internal/datacoord/compaction_task_mix.go
type mixCompactionTask struct {
    *datapb.CompactionTask

    meta      *meta
    allocator allocator.Allocator
    handler   Handler

    // 任务状态
    state     datapb.CompactionTaskState
    startTime time.Time
    endTime   time.Time
}

// 执行 Mix Compaction
func (t *mixCompactionTask) Process() error {
    // 1. 选择要压缩的 Segment
    segments, err := t.selectSegments()
    if err != nil {
        return err
    }

    // 2. 分配新的 Segment ID
    compactedSegmentID, err := t.allocator.AllocOne()
    if err != nil {
        return err
    }

    // 3. 创建 Compaction 计划
    plan := &datapb.CompactionPlan{
        PlanID:             t.GetPlanID(),
        Type:               datapb.CompactionType_MixCompaction,
        SegmentBinlogs:     t.buildSegmentBinlogs(segments),
        CompactedSegmentID: compactedSegmentID,
        Channel:            t.GetChannel(),
    }

    // 4. 分配给 DataNode 执行
    nodeID, err := t.handler.AssignCompactionTask(plan)
    if err != nil {
        return err
    }

    // 5. 更新任务状态
    t.state = datapb.CompactionTaskState_executing
    t.NodeID = nodeID

    log.Info("mix compaction task assigned",
        zap.Int64("planID", t.GetPlanID()),
        zap.Int64("nodeID", nodeID),
        zap.Int("segmentCount", len(segments)))

    return nil
}

// 选择要压缩的 Segment
func (t *mixCompactionTask) selectSegments() ([]*SegmentInfo, error) {
    // 1. 获取 Channel 的所有 Flushed Segment
    segments := t.meta.GetSegmentsByChannel(t.GetChannel())

    // 2. 过滤条件
    candidates := make([]*SegmentInfo, 0)
    for _, segment := range segments {
        // 只选择 Flushed 状态的 Segment
        if segment.State != commonpb.SegmentState_Flushed {
            continue
        }

        // 跳过正在 Compaction 的 Segment
        if segment.IsCompacting {
            continue
        }

        // 跳过太大的 Segment
        if segment.GetSize() > t.getMaxSegmentSize() {
            continue
        }

        candidates = append(candidates, segment)
    }

    // 3. 按大小排序,优先合并小 Segment
    sort.Slice(candidates, func(i, j int) bool {
        return candidates[i].GetSize() < candidates[j].GetSize()
    })

    // 4. 选择前 N 个 Segment
    maxSegmentCount := Params.DataCoordCfg.CompactionMaxSegmentNum.GetAsInt()
    if len(candidates) > maxSegmentCount {
        candidates = candidates[:maxSegmentCount]
    }

    return candidates, nil
}

Compaction 执行流程图

┌─────────────────────────────────────────────┐
│      CompactionTrigger 定期检查             │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│  检查 Collection/Channel 是否需要压缩       │
│  - 小 Segment 数量                          │
│  - Delta 数据量                             │
│  - 上次压缩时间                             │
└──────────────────┬──────────────────────────┘
                   │ 需要压缩
                   ▼
┌─────────────────────────────────────────────┐
│      创建 CompactionTask                    │
│  1. 选择要压缩的 Segment                    │
│  2. 分配新的 Segment ID                     │
│  3. 构建 CompactionPlan                     │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      分配给 DataNode 执行                   │
│  1. 选择负载最低的 DataNode                 │
│  2. 发送 CompactionPlan                     │
│  3. 更新任务状态为 executing                │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      DataNode 执行压缩                      │
│  1. 读取源 Segment 数据                     │
│  2. 合并/过滤数据                           │
│  3. 写入新 Segment                          │
│  4. 上传到对象存储                          │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      DataNode 报告完成                      │
│  CompactionResult                           │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      DataCoord 更新元数据                   │
│  1. 添加新 Segment                          │
│  2. 标记旧 Segment 为 Dropped               │
│  3. 更新任务状态为 completed                │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      GarbageCollector 清理旧数据            │
└─────────────────────────────────────────────┘

4. Index 构建调度

DataCoord 负责调度索引构建任务到 DataNode:

// internal/datacoord/index_service.go
type indexService struct {
    meta      *indexMeta
    allocator allocator.Allocator
    handler   Handler

    // 任务队列
    taskQueue *indexTaskQueue

    // 检查器
    inspector *indexInspector
}

// 创建索引任务
func (s *indexService) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) error {
    // 1. 分配 Index ID
    indexID, err := s.allocator.AllocOne()
    if err != nil {
        return err
    }

    // 2. 获取需要构建索引的 Segment
    segments := s.meta.GetSegmentsByCollection(req.CollectionID)

    // 3. 为每个 Segment 创建索引任务
    for _, segment := range segments {
        // 跳过已有索引的 Segment
        if s.meta.HasIndex(segment.ID, req.FieldID) {
            continue
        }

        // 创建索引任务
        task := &indexTask{
            IndexID:      indexID,
            CollectionID: req.CollectionID,
            PartitionID:  segment.PartitionID,
            SegmentID:    segment.ID,
            FieldID:      req.FieldID,
            IndexParams:  req.IndexParams,
            State:        commonpb.IndexState_Unissued,
        }

        // 添加到任务队列
        if err := s.taskQueue.Enqueue(task); err != nil {
            return err
        }
    }

    // 4. 持久化索引元数据
    indexMeta := &model.Index{
        IndexID:      indexID,
        CollectionID: req.CollectionID,
        FieldID:      req.FieldID,
        IndexName:    req.IndexName,
        IndexParams:  req.IndexParams,
        CreateTime:   time.Now().Unix(),
    }

    return s.meta.AddIndex(indexMeta)
}

// 调度索引任务
func (s *indexService) scheduleIndexTasks() {
    // 1. 从队列获取待执行任务
    tasks := s.taskQueue.Dequeue(10)

    for _, task := range tasks {
        // 2. 选择 DataNode
        nodeID, err := s.selectDataNode(task)
        if err != nil {
            log.Warn("failed to select datanode", zap.Error(err))
            continue
        }

        // 3. 分配任务
        if err := s.handler.AssignIndexTask(nodeID, task); err != nil {
            log.Warn("failed to assign index task", zap.Error(err))
            continue
        }

        // 4. 更新任务状态
        task.State = commonpb.IndexState_InProgress
        task.NodeID = nodeID
        s.meta.UpdateIndexTask(task)

        log.Info("index task assigned",
            zap.Int64("indexID", task.IndexID),
            zap.Int64("segmentID", task.SegmentID),
            zap.Int64("nodeID", nodeID))
    }
}

5. 垃圾回收机制

GarbageCollector 负责清理过期的数据和元数据:

// internal/datacoord/garbage_collector.go
type garbageCollector struct {
    meta    *meta
    handler Handler

    // 配置
    option GcOption

    // 状态
    closeCh chan struct{}
    wg      sync.WaitGroup
}

type GcOption struct {
    // 扫描间隔
    scanInterval time.Duration

    // 保留时间
    dropTolerance        time.Duration  // Dropped Segment 保留时间
    missingTolerance     time.Duration  // 缺失 Segment 保留时间

    // 是否启用
    enabled bool
}

// 启动垃圾回收
func (gc *garbageCollector) start() {
    if !gc.option.enabled {
        return
    }

    gc.wg.Add(1)
    go gc.recycleLoop()
}

// 回收循环
func (gc *garbageCollector) recycleLoop() {
    defer gc.wg.Done()

    ticker := time.NewTicker(gc.option.scanInterval)
    defer ticker.Stop()

    for {
        select {
        case <-gc.closeCh:
            return
        case <-ticker.C:
            // 1. 回收 Dropped Segment
            gc.recycleDroppedSegments()

            // 2. 回收 Binlog 文件
            gc.recycleBinlogs()

            // 3. 回收索引文件
            gc.recycleIndexFiles()

            // 4. 回收元数据
            gc.recycleMetadata()
        }
    }
}

// 回收 Dropped Segment
func (gc *garbageCollector) recycleDroppedSegments() {
    // 1. 获取所有 Dropped 状态的 Segment
    segments := gc.meta.GetSegmentsByState(commonpb.SegmentState_Dropped)

    now := time.Now()
    for _, segment := range segments {
        // 2. 检查是否超过保留时间
        dropTime := time.Unix(segment.DropTime, 0)
        if now.Sub(dropTime) < gc.option.dropTolerance {
            continue
        }

        // 3. 删除对象存储中的文件
        if err := gc.removeSegmentFiles(segment); err != nil {
            log.Warn("failed to remove segment files",
                zap.Int64("segmentID", segment.ID),
                zap.Error(err))
            continue
        }

        // 4. 删除元数据
        if err := gc.meta.RemoveSegment(segment.ID); err != nil {
            log.Warn("failed to remove segment metadata",
                zap.Int64("segmentID", segment.ID),
                zap.Error(err))
            continue
        }

        log.Info("segment recycled",
            zap.Int64("segmentID", segment.ID),
            zap.String("channel", segment.InsertChannel))
    }
}

// 删除 Segment 文件
func (gc *garbageCollector) removeSegmentFiles(segment *SegmentInfo) error {
    // 1. 删除 Binlog 文件
    for _, binlog := range segment.GetBinlogs() {
        for _, file := range binlog.GetBinlogs() {
            if err := gc.handler.RemoveFile(file.GetLogPath()); err != nil {
                return err
            }
        }
    }

    // 2. 删除 Deltalog 文件
    for _, deltalog := range segment.GetDeltalogs() {
        for _, file := range deltalog.GetBinlogs() {
            if err := gc.handler.RemoveFile(file.GetLogPath()); err != nil {
                return err
            }
        }
    }

    // 3. 删除 Statslog 文件
    for _, statslog := range segment.GetStatslogs() {
        for _, file := range statslog.GetBinlogs() {
            if err := gc.handler.RemoveFile(file.GetLogPath()); err != nil {
                return err
            }
        }
    }

    return nil
}

垃圾回收流程图

┌─────────────────────────────────────────────┐
│    GarbageCollector 定期扫描                │
│    (默认每 1 小时)                          │
└──────────────────┬──────────────────────────┘
                   │
    ┌──────────────┼──────────────┐
    │              │              │
    ▼              ▼              ▼
┌──────────┐  ┌──────────┐  ┌──────────┐
│ Dropped  │  │ Binlog   │  │ Index    │
│ Segment  │  │ Files    │  │ Files    │
└────┬─────┘  └────┬─────┘  └────┬─────┘
     │             │             │
     │ 检查保留时间 │             │
     ▼             ▼             ▼
┌──────────────────────────────────────┐
│  超过保留时间?                       │
│  - Dropped: 24 小时                  │
│  - Missing: 1 小时                   │
└────┬─────────────────────────────────┘
     │ 是
     ▼
┌──────────────────────────────────────┐
│  删除对象存储中的文件                 │
│  1. Binlog 文件                      │
│  2. Deltalog 文件                    │
│  3. Statslog 文件                    │
│  4. Index 文件                       │
└────┬─────────────────────────────────┘
     │
     ▼
┌──────────────────────────────────────┐
│  删除 etcd 中的元数据                │
└──────────────────────────────────────┘

🔍 源码深度剖析

1. Channel 管理与负载均衡

DataCoord 负责管理 DML Channel 的分配和负载均衡:

// internal/datacoord/channel.go
type channelManager struct {
    meta        *meta
    nodeManager session.NodeManager

    // Channel 分配策略
    policy ChannelAssignPolicy

    // Channel 状态
    channels map[string]*channelState
    mu       sync.RWMutex
}

type channelState struct {
    Name     string
    NodeID   int64
    State    datapb.ChannelWatchState

    // 统计信息
    SegmentCount int
    DataSize     int64
}

// 分配 Channel
func (cm *channelManager) AssignChannels(ctx context.Context, channels []string) error {
    cm.mu.Lock()
    defer cm.mu.Unlock()

    // 1. 获取所有可用的 DataNode
    nodes := cm.nodeManager.GetNodes()
    if len(nodes) == 0 {
        return errors.New("no available datanode")
    }

    // 2. 使用策略分配 Channel
    assignments := cm.policy.Assign(channels, nodes)

    // 3. 通知 DataNode 监听 Channel
    for nodeID, channelList := range assignments {
        req := &datapb.WatchChannelsRequest{
            Channels: channelList,
        }

        if err := cm.nodeManager.WatchChannels(nodeID, req); err != nil {
            return err
        }

        // 4. 更新 Channel 状态
        for _, channel := range channelList {
            cm.channels[channel] = &channelState{
                Name:   channel,
                NodeID: nodeID,
                State:  datapb.ChannelWatchState_ToWatch,
            }
        }
    }

    return nil
}

// Channel 负载均衡
func (cm *channelManager) Balance(ctx context.Context) error {
    cm.mu.Lock()
    defer cm.mu.Unlock()

    // 1. 计算每个节点的负载
    nodeLoads := cm.calculateNodeLoads()

    // 2. 找出负载最高和最低的节点
    maxLoadNode, minLoadNode := cm.findExtremeNodes(nodeLoads)

    // 3. 检查是否需要均衡
    if !cm.shouldBalance(nodeLoads[maxLoadNode], nodeLoads[minLoadNode]) {
        return nil
    }

    // 4. 选择要迁移的 Channel
    channel := cm.selectChannelToMigrate(maxLoadNode)

    // 5. 执行迁移
    if err := cm.migrateChannel(channel, maxLoadNode, minLoadNode); err != nil {
        return err
    }

    log.Info("channel balanced",
        zap.String("channel", channel),
        zap.Int64("fromNode", maxLoadNode),
        zap.Int64("toNode", minLoadNode))

    return nil
}

Channel 分配策略

// 1. 轮询策略
type roundRobinPolicy struct {
    index int
}

func (p *roundRobinPolicy) Assign(channels []string, nodes []int64) map[int64][]string {
    result := make(map[int64][]string)

    for _, channel := range channels {
        nodeID := nodes[p.index%len(nodes)]
        result[nodeID] = append(result[nodeID], channel)
        p.index++
    }

    return result
}

// 2. 负载均衡策略
type loadBalancePolicy struct {
    meta *meta
}

func (p *loadBalancePolicy) Assign(channels []string, nodes []int64) map[int64][]string {
    // 计算每个节点的当前负载
    nodeLoads := make(map[int64]int64)
    for _, nodeID := range nodes {
        nodeLoads[nodeID] = p.calculateNodeLoad(nodeID)
    }

    result := make(map[int64][]string)

    for _, channel := range channels {
        // 选择负载最低的节点
        minLoadNode := nodes[0]
        minLoad := nodeLoads[minLoadNode]

        for _, nodeID := range nodes[1:] {
            if nodeLoads[nodeID] < minLoad {
                minLoadNode = nodeID
                minLoad = nodeLoads[nodeID]
            }
        }

        // 分配 Channel
        result[minLoadNode] = append(result[minLoadNode], channel)

        // 更新负载
        nodeLoads[minLoadNode] += p.estimateChannelLoad(channel)
    }

    return result
}

2. 元数据管理

DataCoord 的元数据管理是其核心功能:

// internal/datacoord/meta.go
type meta struct {
    // 元数据存储
    catalog metastore.DataCoordCatalog

    // 内存缓存
    segments     *SegmentsInfo              // Segment 信息
    collections  map[UniqueID]*collectionInfo // Collection 信息
    indexes      map[UniqueID]*model.Index   // Index 信息

    // 并发控制
    segmentLock sync.RWMutex
    indexLock   sync.RWMutex
}

type SegmentsInfo struct {
    segments map[UniqueID]*SegmentInfo

    // 索引
    channel2Segments map[string][]UniqueID
    state2Segments   map[commonpb.SegmentState][]UniqueID
}

// 添加 Segment
func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
    m.segmentLock.Lock()
    defer m.segmentLock.Unlock()

    // 1. 检查是否已存在
    if _, ok := m.segments.segments[segment.ID]; ok {
        return fmt.Errorf("segment %d already exists", segment.ID)
    }

    // 2. 持久化到 etcd
    if err := m.catalog.AddSegment(ctx, segment.SegmentInfo); err != nil {
        return err
    }

    // 3. 更新内存缓存
    m.segments.segments[segment.ID] = segment

    // 4. 更新索引
    channel := segment.InsertChannel
    m.segments.channel2Segments[channel] = append(
        m.segments.channel2Segments[channel],
        segment.ID,
    )

    state := segment.State
    m.segments.state2Segments[state] = append(
        m.segments.state2Segments[state],
        segment.ID,
    )

    return nil
}

// 更新 Segment
func (m *meta) UpdateSegment(ctx context.Context, segment *SegmentInfo) error {
    m.segmentLock.Lock()
    defer m.segmentLock.Unlock()

    // 1. 检查是否存在
    old, ok := m.segments.segments[segment.ID]
    if !ok {
        return fmt.Errorf("segment %d not found", segment.ID)
    }

    // 2. 持久化到 etcd
    if err := m.catalog.AlterSegment(ctx, segment.SegmentInfo); err != nil {
        return err
    }

    // 3. 更新内存缓存
    m.segments.segments[segment.ID] = segment

    // 4. 更新状态索引(如果状态改变)
    if old.State != segment.State {
        // 从旧状态列表中移除
        m.removeFromStateIndex(old.ID, old.State)

        // 添加到新状态列表
        m.segments.state2Segments[segment.State] = append(
            m.segments.state2Segments[segment.State],
            segment.ID,
        )
    }

    return nil
}

3. 数据导入(Import)机制

DataCoord 支持批量数据导入功能,并利用 Go 的高并发特性高效调度:

// internal/datacoord/import_job.go
type ImportJob struct {
    JobID        int64
    CollectionID int64
    PartitionID  int64

    // 导入文件
    Files []string

    // 任务列表
    Tasks []*ImportTask

    // 状态
    State     datapb.ImportJobState
    Reason    string
    Progress  int32

    // 时间
    CreateTime time.Time
    StartTime  time.Time
    EndTime    time.Time
}

// 创建导入任务
func (s *Server) Import(ctx context.Context, req *datapb.ImportRequest) (*datapb.ImportResponse, error) {
    // 1. 分配 Job ID
    jobID, err := s.idAllocator.AllocOne()
    if err != nil {
        return nil, err
    }

    // 2. 创建 Import Job
    job := &ImportJob{
        JobID:        jobID,
        CollectionID: req.CollectionID,
        PartitionID:  req.PartitionID,
        Files:        req.Files,
        State:        datapb.ImportJobState_Pending,
        CreateTime:   time.Now(),
    }

    // 3. 拆分为多个 Import Task
    tasks, err := s.splitImportTasks(job)
    if err != nil {
        return nil, err
    }
    job.Tasks = tasks

    // 4. 保存到元数据
    if err := s.importMeta.AddJob(job); err != nil {
        return nil, err
    }

    // 5. 提交任务到队列
    for _, task := range tasks {
        if err := s.importChecker.AddTask(task); err != nil {
            return nil, err
        }
    }

    return &datapb.ImportResponse{
        Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
        JobID:  jobID,
    }, nil
}

// 拆分导入任务
func (s *Server) splitImportTasks(job *ImportJob) ([]*ImportTask, error) {
    tasks := make([]*ImportTask, 0)

    // 每个文件创建一个 PreImport 任务
    for _, file := range job.Files {
        taskID, err := s.idAllocator.AllocOne()
        if err != nil {
            return nil, err
        }

        task := &ImportTask{
            TaskID:       taskID,
            JobID:        job.JobID,
            CollectionID: job.CollectionID,
            PartitionID:  job.PartitionID,
            Files:        []string{file},
            Type:         datapb.ImportTaskType_PreImport,
            State:        datapb.ImportTaskState_Pending,
        }

        tasks = append(tasks, task)
    }

    return tasks, nil
}

// Import Inspector 检查和调度任务
func (ii *importInspector) Start() {
    go ii.scheduleLoop()
}

func (ii *importInspector) scheduleLoop() {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ii.closeCh:
            return
        case <-ticker.C:
            // 1. 检查 Pending 任务
            ii.schedulePendingTasks()

            // 2. 检查 InProgress 任务
            ii.checkInProgressTasks()

            // 3. 检查 Completed 任务
            ii.processCompletedTasks()
        }
    }
}

func (ii *importInspector) schedulePendingTasks() {
    // 获取所有 Pending 任务
    tasks := ii.meta.GetTasksByState(datapb.ImportTaskState_Pending)

    for _, task := range tasks {
        // 选择 DataNode
        nodeID, err := ii.selectDataNode(task)
        if err != nil {
            log.Warn("failed to select datanode", zap.Error(err))
            continue
        }

        // 分配任务
        if err := ii.assignTask(nodeID, task); err != nil {
            log.Warn("failed to assign import task", zap.Error(err))
            continue
        }

        // 更新任务状态
        task.State = datapb.ImportTaskState_InProgress
        task.NodeID = nodeID
        ii.meta.UpdateTask(task)

        log.Info("import task assigned",
            zap.Int64("taskID", task.TaskID),
            zap.Int64("nodeID", nodeID))
    }
}

Import 流程图

┌─────────────────────────────────────────────┐
│      Client 提交 Import 请求                │
│      - Collection ID                        │
│      - 文件列表(S3/MinIO 路径)            │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      DataCoord 创建 Import Job              │
│  1. 分配 Job ID                             │
│  2. 拆分为多个 PreImport Task               │
│  3. 保存元数据                              │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      ImportInspector 调度 PreImport Task    │
│  1. 选择 DataNode                           │
│  2. 分配任务                                │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      DataNode 执行 PreImport                │
│  1. 读取文件头信息                          │
│  2. 验证 Schema                             │
│  3. 统计数据量                              │
│  4. 返回统计信息                            │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      DataCoord 创建 Import Task             │
│  根据数据量拆分为多个 Import Task           │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      DataNode 执行 Import                   │
│  1. 读取数据文件                            │
│  2. 转换为 Milvus 格式                      │
│  3. 写入 Segment                            │
│  4. 上传到对象存储                          │
└──────────────────┬──────────────────────────┘
                   │
                   ▼
┌─────────────────────────────────────────────┐
│      DataCoord 更新元数据                   │
│  1. 添加新 Segment                          │
│  2. 更新 Job 进度                           │
│  3. 标记任务完成                            │
└─────────────────────────────────────────────┘

🛠️ 调试与实验

1. 如何调试 DataCoord

方法 1:查看 Segment 信息
# 使用 Birdwatcher 查看 Segment
birdwatcher> connect --etcd localhost:2379
# 查看所有 Segment
birdwatcher> show segment
# 查看特定 Collection 的 Segment
birdwatcher> show segment --collection 123456
# 查看 Segment 详细信息
birdwatcher> show segment-info --segment 789012
方法 2:监控 Compaction
# 查看 Compaction 任务
birdwatcher> show compaction-task

# 查看 Compaction 统计
curl http://localhost:9091/metrics | grep datacoord_compaction

# 关键指标:
# - datacoord_compaction_task_num: 任务数量
# - datacoord_compaction_latency: 延迟
# - datacoord_compaction_data_size: 数据量
方法 3:调试 Segment 分配
# debug_segment_allocation.py
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import time

# 连接 Milvus
connections.connect(host="localhost", port="19530")

# 创建 Collection
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)
]
schema = CollectionSchema(fields=fields)
collection = Collection(name="test_segment", schema=schema)

# 持续插入数据,观察 Segment 分配
for i in range(100):
    data = [
        [j for j in range(i*1000, (i+1)*1000)],
        [[0.1]*128 for _ in range(1000)]
    ]
    collection.insert(data)

    # 刷盘
    collection.flush()

    # 查看 Segment 数量
    print(f"Round {i}: {collection.num_entities} entities")

    time.sleep(1)

观察 DataCoord 日志

# 查看 DataCoord 日志
kubectl logs -f milvus-datacoord-xxx -n milvus

# 关键日志:
[INFO] [datacoord/segment_manager.go:xxx] segment allocated
    segmentID: 446402150400012346
    channel: by-dev-rootcoord-dml_0
    numRows: 1000

[INFO] [datacoord/segment_manager.go:xxx] segment sealed
    segmentID: 446402150400012346
    reason: reach max row num

[INFO] [datacoord/compaction_trigger.go:xxx] compaction triggered
    collectionID: 446402150400012345
    channel: by-dev-rootcoord-dml_0
    segmentCount: 5

2. 实验:观察 Compaction 过程

# experiment_compaction.py
from pymilvus import connections, Collection, utility
import time

connections.connect(host="localhost", port="19530")

# 创建 Collection
collection = Collection("test_compaction")

# 1. 插入大量小批次数据(产生很多小 Segment)
print("Inserting data to create small segments...")
for i in range(50):
    data = [
        [j for j in range(i*100, (i+1)*100)],
        [[0.1]*128 for _ in range(100)]
    ]
    collection.insert(data)
    collection.flush()
    time.sleep(0.1)

# 2. 查看 Segment 数量
stats = collection.get_stats()
print(f"Segment count before compaction: {stats}")

# 3. 手动触发 Compaction
print("Triggering compaction...")
utility.do_compact(collection.name)

# 4. 等待 Compaction 完成
time.sleep(30)

# 5. 查看 Compaction 后的 Segment 数量
stats = collection.get_stats()
print(f"Segment count after compaction: {stats}")

3. 常见问题排查

问题 1:Segment 分配失败

现象

Error: failed to allocate segment

排查步骤

# 1. 检查 DataNode 状态
birdwatcher> show session --role datanode

# 2. 检查 Channel 分配
birdwatcher> show channel-watch

# 3. 检查 Segment 配额
# 查看是否达到最大 Segment 数量限制

解决方案

# 调整配置
dataCoord:
  segment:
    maxSize: 1024  # 增加 Segment 最大大小
    sealProportion: 0.75  # 调整封存比例
问题 2:Compaction 不执行

现象

小 Segment 一直不合并

排查步骤

# 1. 检查 Compaction 配置
curl http://localhost:9091/api/v1/config | grep compaction

# 2. 检查 Compaction 触发器状态
birdwatcher> show compaction-trigger

# 3. 查看 DataNode 负载
birdwatcher> show datanode-load

解决方案

# 调整 Compaction 配置
dataCoord:
  compaction:
    enableAutoCompaction: true
    minSegmentToMerge: 3  # 降低最小合并数量
    maxSegmentToMerge: 10
问题 3:Index 构建慢

现象

索引构建任务长时间不完成

排查步骤

# 1. 检查 Index 任务状态
birdwatcher> show index-task

# 2. 检查 DataNode 资源
kubectl top pod -n milvus | grep datanode

# 3. 查看 Index 构建日志
kubectl logs -f milvus-datanode-xxx -n milvus | grep index

解决方案

# 增加 DataNode 资源
dataNode:
  resources:
    limits:
      cpu: "8"
      memory: "16Gi"

# 增加并发数
  indexBuildParallel: 4

📚 扩展阅读

1. 相关设计文档

  • DataCoord Design Doc
  • Segment Management
  • Compaction Design
  • Import Design

2. 相关论文

  • LSM-Tree: The Log-Structured Merge-Tree
    • Compaction 机制的理论基础
  • Bigtable: A Distributed Storage System for Structured Data
    • Google 的分布式存储系统,Segment 管理的灵感来源
  • RocksDB: A Persistent Key-Value Store
    • LSM-Tree 的实现,Compaction 策略参考

3. 源码阅读建议

推荐阅读顺序

  1. 入门(1-2 天)
    • server.go - 理解整体结构
    • segment_manager.go - 理解 Segment 管理
    • meta.go - 理解元数据管理
  2. 进阶(3-5 天)
    • compaction_trigger.go - 理解 Compaction 触发
    • compaction_task_*.go - 理解 Compaction 任务
    • index_service.go - 理解索引服务
  3. 高级(1 周)
    • channel.go - 理解 Channel 管理
    • garbage_collector.go - 理解垃圾回收
    • import_*.go - 理解数据导入

调试技巧

// 1. 在关键路径添加断点
// segment_manager.go:AllocSegment
// compaction_trigger.go:TriggerCompaction
// index_service.go:CreateIndex

// 2. 使用条件断点
// 只在特定 Collection 时触发
if req.CollectionID == 123456 {
    // 断点位置
}

// 3. 观察变量
// - segment.State
// - compactionTask.State
// - indexTask.Progress

4. 性能优化建议

Segment 管理优化
# 1. 调整 Segment 大小
dataCoord:
  segment:
    maxSize: 1024 # MB,根据数据特点调整
    sealProportion: 0.75 # 封存比例

# 2. 调整封存策略
dataCoord:
  segment:
    maxLifetime: 86400 # 秒,最大生命周期
    maxIdleTime: 3600   # 秒,最大空闲时间
Compaction 优化
# 1. 调整 Compaction 触发条件
dataCoord:
  compaction:
    enableAutoCompaction: true
    minSegmentToMerge: 3
    maxSegmentToMerge: 10

# 2. 调整 Compaction 间隔
dataCoord:
  compaction:
    checkInterval: 60 # 秒

# 3. 启用 Clustering Compaction
dataCoord:
  compaction:
    enableClustering: true
    clusteringInterval: 3600 # 秒
Index 构建优化
# 1. 增加并发数
dataNode:
  indexBuildParallel: 4

# 2. 调整资源限制
dataNode:
  resources:
    limits:
      cpu: "8"
      memory: "16Gi"

# 3. 使用 GPU 加速
dataNode:
  enableGPU: true
  gpuMemoryRatio: 0.5

5. 贡献代码指南

常见贡献方向

  1. Compaction 优化
    • 改进 Compaction 策略
    • 优化 Compaction 性能
    • 支持新的 Compaction 类型
  2. Segment 管理
    • 优化 Segment 分配算法
    • 改进封存策略
    • 支持更灵活的配置
  3. Index 服务
    • 支持新的索引类型
    • 优化索引构建调度
    • 改进索引进度跟踪

提交 PR 流程

# 1. Fork 仓库
git clone https://github.com/your-username/milvus.git

# 2. 创建分支
git checkout -b feature/improve-datacoord

# 3. 修改代码
# 编辑 internal/datacoord/xxx.go

# 4. 添加测试
# 编辑 internal/datacoord/xxx_test.go

# 5. 运行测试
cd internal/datacoord
go test -v

# 6. 提交代码
git commit -m "feat: improve DataCoord compaction"
git push origin feature/improve-datacoord

# 7. 创建 Pull Request

💡 总结

核心要点回顾

  1. DataCoord 是 Milvus 的数据管家
    • 管理 Segment 的完整生命周期
    • 调度 Compaction 和 Index 构建
    • 负责 Channel 分配和负载均衡
    • 执行垃圾回收清理过期数据
  2. 四大核心组件
    • SegmentManager:Segment 分配、封存、删除
    • CompactionTrigger:定期触发 Compaction 任务
    • IndexService:管理索引构建任务
    • GarbageCollector:清理过期数据和元数据
  3. Segment 生命周期
    • New → Growing → Sealed → Flushed → Compacted → Dropped
    • 多种封存策略:容量、生命周期、空闲时间
    • 自动刷盘和压缩机制
  4. Compaction 机制
    • Mix Compaction:合并小 Segment,减少文件数
    • L0 Compaction:合并 Delta 数据,应用删除操作
    • Clustering Compaction:数据聚簇,优化范围查询
  5. 关键设计模式
    • 策略模式:灵活的 Segment 封存和分配策略
    • 观察者模式:Inspector 监控任务状态
    • 生产者-消费者:任务队列和调度
    • 状态机:Segment 和任务的状态转换

最佳实践建议

  1. Segment 管理
    • 根据数据特点调整 Segment 大小
    • 合理设置封存策略
    • 监控 Segment 数量和大小分布
  2. Compaction 优化
    • 启用自动 Compaction
    • 根据负载调整触发间隔
    • 定期执行 Clustering Compaction
  3. Index 构建
    • 合理分配 DataNode 资源
    • 调整并发数提高吞吐量
    • 使用 GPU 加速索引构建
  4. 监控和调试
    • 使用 Birdwatcher 查看元数据
    • 监控 Prometheus 指标
    • 定期检查云原生架构下的垃圾回收效果



上一篇:EDR-Freeze攻击原理剖析:基于死锁冻结的EDR/AV进程竞态条件攻击
下一篇:汇川PLC伺服控制中的MC_Home指令详解:原点回归功能配置与调试
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-17 03:21 , Processed in 0.132800 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表