📋 概述
DataCoord 的定位
DataCoord(Data Coordinator)是 Milvus 的“数据管家”,负责整个集群的数据生命周期管理。它的核心职责包括:
- Segment 管理:分配、封存、合并 Segment
- Channel 管理:管理 DML Channel 的分配和负载均衡
- Compaction 调度:触发和调度数据压缩任务
- Index 构建:调度索引构建任务
- 垃圾回收:清理过期的数据和元数据
核心功能列表
DataCoord 核心功能
├── Segment 生命周期管理
│ ├── Segment 分配(Growing)
│ ├── Segment 封存(Sealed)
│ ├── Segment 刷盘(Flushed)
│ └── Segment 删除(Dropped)
├── Compaction 压缩
│ ├── L0 Compaction(Delta 合并)
│ ├── Mix Compaction(小文件合并)
│ ├── Clustering Compaction(聚簇优化)
│ └── Compaction 任务调度
├── Index 索引管理
│ ├── Index 构建任务分配
│ ├── Index 进度跟踪
│ └── Index 版本管理
├── Channel 管理
│ ├── Channel 分配
│ ├── Channel 负载均衡
│ └── Channel 故障恢复
└── 垃圾回收
├── Segment GC
├── Binlog GC
└── Index GC
与其他组件的关系
┌─────────────────────────────────────────────┐
│ RootCoord │
│ (通知 Collection 创建) │
└──────────────────┬──────────────────────────┘
│ DDL 事件
▼
┌─────────────────────────────────────────────┐
│ DataCoord │
│ ┌─────────────────────────────────────┐ │
│ │ SegmentManager (Segment 管理) │ │
│ └─────────────────────────────────────┘ │
│ ┌─────────────────────────────────────┐ │
│ │ CompactionTrigger (压缩触发器) │ │
│ └─────────────────────────────────────┘ │
│ ┌─────────────────────────────────────┐ │
│ │ IndexService (索引服务) │ │
│ └─────────────────────────────────────┘ │
│ ┌─────────────────────────────────────┐ │
│ │ GarbageCollector (垃圾回收) │ │
│ └─────────────────────────────────────┘ │
└──────────┬───────────┬──────────┬───────────┘
│ │ │
│ 分配任务 │ 监控 │ 清理
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│DataNode │ │QueryNode │ │ MinIO │
│(执行任务)│ │(查询数据)│ │(存储) │
└──────────┘ └──────────┘ └──────────┘
🗂️ 源码结构
目录结构
internal/datacoord/
├── server.go # 主服务实现
├── meta.go # 元数据管理
├── segment_manager.go # Segment 管理器
├── channel.go # Channel 管理
│
# Segment 相关
├── segment_info.go # Segment 信息
├── segment_operator.go # Segment 操作
├── segment_allocation_policy.go # Segment 分配策略
│
# Compaction 相关
├── compaction_trigger.go # Compaction 触发器
├── compaction_trigger_v2.go # 新版触发器
├── compaction_inspector.go # Compaction 检查器
├── compaction_policy_*.go # Compaction 策略
├── compaction_task_*.go # Compaction 任务
├── compaction_queue.go # Compaction 队列
├── compaction_view.go # Compaction 视图
│
# Index 相关
├── index_service.go # 索引服务
├── index_meta.go # 索引元数据
├── index_inspector.go # 索引检查器
├── index_engine_version_manager.go # 索引引擎版本管理
│
# Import 相关
├── import_job.go # 导入任务
├── import_meta.go # 导入元数据
├── import_task_*.go # 导入任务实现
├── import_inspector.go # 导入检查器
├── import_checker.go # 导入检查器
│
# 其他
├── garbage_collector.go # 垃圾回收
├── handler.go # 请求处理器
├── services.go # gRPC 服务实现
├── allocator/ # ID 分配器
├── broker/ # 与其他组件通信
├── session/ # 会话管理
└── task/ # 任务系统
核心数据结构
让我们先看看 DataCoord 的核心结构体:
// internal/datacoord/server.go
// Server implements `types.DataCoord`
type Server struct {
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
quitCh chan struct{}
stateCode atomic.Value
// 客户端连接
etcdCli *clientv3.Client // etcd 客户端
tikvCli *txnkv.Client // TiKV 客户端(可选)
// 核心组件
meta *meta // 元数据管理
segmentManager Manager // Segment 管理器
allocator allocator.Allocator // ID 分配器
idAllocator *GlobalIDAllocator // 全局 ID 分配器
// 节点管理
nodeManager session.NodeManager // DataNode 管理
cluster session.Cluster // 集群管理
// Compaction 相关
compactionTrigger trigger // Compaction 触发器
compactionInspector CompactionInspector // Compaction 检查器
compactionTriggerManager TriggerManager // Compaction 触发管理器
// Index 相关
indexService *indexService // 索引服务
indexMeta *indexMeta // 索引元数据
// Import 相关
importMeta ImportMeta // 导入元数据
importInspector ImportInspector // 导入检查器
importChecker ImportChecker // 导入检查器
// 垃圾回收
garbageCollector *garbageCollector // 垃圾回收器
// 通道
flushCh chan UniqueID // 刷盘通道
notifyIndexChan chan UniqueID // 索引通知通道
// 会话
session sessionutil.SessionInterface // 服务会话
dnSessionWatcher sessionutil.SessionWatcher // DataNode 会话监听
qnSessionWatcher sessionutil.SessionWatcher // QueryNode 会话监听
}
关键字段说明:
- meta:管理所有 Segment、Channel、Index 的元数据
- segmentManager:负责 Segment 的分配、封存、删除
- compactionTrigger:定期触发 Compaction 任务
- indexService:管理索引构建任务
- garbageCollector:清理过期的数据和元数据
🚀 核心流程解析
1. DataCoord 启动流程
// internal/datacoord/server.go
func (s *Server) Init() error {
// 1. 初始化 etcd 客户端
s.initEtcd()
// 2. 初始化会话(注册到 etcd)
s.initSession()
// 3. 初始化 ID 分配器
s.initIDAllocator()
// 4. 初始化元数据
s.initMeta()
// 5. 初始化 Segment 管理器
s.initSegmentManager()
// 6. 初始化 Compaction 组件
s.initCompaction()
// 7. 初始化 Index 服务
s.initIndexService()
// 8. 初始化垃圾回收器
s.initGarbageCollector()
// 9. 初始化节点管理器
s.initNodeManager()
return nil
}
func (s *Server) Start() error {
// 1. 启动节点管理器(监听 DataNode)
s.nodeManager.Start()
// 2. 启动 Segment 管理器
s.segmentManager.Start()
// 3. 启动 Compaction 触发器
s.compactionTrigger.start()
// 4. 启动 Index 服务
s.indexService.Start()
// 5. 启动垃圾回收器
s.garbageCollector.start()
// 6. 启动后台任务
s.startServerLoop()
// 7. 更新服务状态
s.UpdateStateCode(commonpb.StateCode_Healthy)
return nil
}
启动流程图:
┌─────────────────────────────────────────────┐
│ DataCoord.Init() │
└──────────────────┬──────────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌─────────┐ ┌──────────┐
│ etcd │ │ Session │ │ ID/Meta │
│ Client │ │ Register│ │Allocator │
└────────┘ └─────────┘ └──────────┘
│
▼
┌─────────────────┐
│ SegmentManager │
│ (初始化) │
└─────────────────┘
│
▼
┌─────────────────┐
│ Compaction │
│ (初始化触发器) │
└─────────────────┘
│
▼
┌─────────────────┐
│ IndexService │
│ (初始化) │
└─────────────────┘
│
▼
┌─────────────────┐
│ DataCoord │
│ Ready! │
└─────────────────┘
2. Segment 管理核心实现
SegmentManager 是 DataCoord 最核心的组件,负责 Segment 的整个生命周期:
// internal/datacoord/segment_manager.go
type SegmentManager struct {
meta *meta // 元数据
allocator allocator.Allocator // ID 分配器
helper allocHelper // 辅助工具
// 并发控制
channelLock *lock.KeyLock[string] // Channel 级锁
channel2Growing *typeutil.ConcurrentMap[string, UniqueSet] // Channel -> Growing Segments
channel2Sealed *typeutil.ConcurrentMap[string, UniqueSet] // Channel -> Sealed Segments
// 策略
estimatePolicy calUpperLimitPolicy // 容量估算策略
allocPolicy AllocatePolicy // 分配策略
segmentSealPolicies []SegmentSealPolicy // 封存策略
channelSealPolicies []channelSealPolicy // Channel 封存策略
flushPolicy flushPolicy // 刷盘策略
}
Segment 分配流程:
// internal/datacoord/segment_manager.go
func (s *SegmentManager) AllocSegment(
ctx context.Context,
collectionID, partitionID UniqueID,
channelName string,
requestRows int64,
storageVersion int64,
) ([]*Allocation, error) {
// 1. 获取 Channel 锁
s.channelLock.Lock(channelName)
defer s.channelLock.Unlock(channelName)
// 2. 获取或创建 Growing Segment
segment, err := s.getOrCreateGrowingSegment(ctx, collectionID, partitionID, channelName, storageVersion)
if err != nil {
return nil, err
}
// 3. 检查 Segment 容量
if !s.canAllocate(segment, requestRows) {
// 封存当前 Segment
if err := s.sealSegment(ctx, segment.GetID()); err != nil {
return nil, err
}
// 创建新的 Growing Segment
segment, err = s.createNewSegment(ctx, collectionID, partitionID, channelName, storageVersion)
if err != nil {
return nil, err
}
}
// 4. 分配空间
allocation := &Allocation{
SegmentID: segment.GetID(),
NumOfRows: requestRows,
ExpireTime: s.calculateExpireTime(),
}
// 5. 更新 Segment 元数据
segment.NumOfRows += requestRows
if err := s.meta.UpdateSegment(segment); err != nil {
return nil, err
}
return []*Allocation{allocation}, nil
}
Segment 状态转换:
┌─────────────────────────────────────────────┐
│ Segment 生命周期 │
└─────────────────────────────────────────────┘
┌──────────┐
│ New │ 创建新 Segment
└────┬─────┘
│ AllocSegment()
▼
┌──────────┐
│ Growing │ 正在写入数据
└────┬─────┘
│ 达到容量/超时
│ SealSegment()
▼
┌──────────┐
│ Sealed │ 已封存,等待刷盘
└────┬─────┘
│ FlushSegment()
▼
┌──────────┐
│ Flushed │ 已刷盘到对象存储
└────┬─────┘
│ Compaction
▼
┌──────────┐
│Compacted │ 已压缩
└────┬─────┘
│ GC
▼
┌──────────┐
│ Dropped │ 已删除
└──────────┘
Segment 封存策略:
// internal/datacoord/segment_allocation_policy.go
// Segment 封存策略接口
type SegmentSealPolicy interface {
ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string)
}
// 1. 容量策略:Segment 达到最大行数
type sealByCapacityPolicy struct {
maxRowNum int64
}
func (p *sealByCapacityPolicy) ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string) {
if segment.NumOfRows >= p.maxRowNum {
return true, fmt.Sprintf("segment %d reaches max row num %d", segment.ID, p.maxRowNum)
}
return false, ""
}
// 2. 生命周期策略:Segment 存在时间过长
type sealByLifetimePolicy struct {
lifetime time.Duration
}
func (p *sealByLifetimePolicy) ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string) {
createTime := tsoutil.PhysicalTime(segment.CreateTime)
currentTime := tsoutil.PhysicalTime(ts)
if currentTime.Sub(createTime) > p.lifetime {
return true, fmt.Sprintf("segment %d exceeds lifetime %v", segment.ID, p.lifetime)
}
return false, ""
}
// 3. 空闲策略:Segment 长时间没有写入
type sealByIdlePolicy struct {
idleTime time.Duration
}
func (p *sealByIdlePolicy) ShouldSeal(segment *SegmentInfo, ts Timestamp) (bool, string) {
lastModifyTime := tsoutil.PhysicalTime(segment.LastModifyTime)
currentTime := tsoutil.PhysicalTime(ts)
if currentTime.Sub(lastModifyTime) > p.idleTime {
return true, fmt.Sprintf("segment %d is idle for %v", segment.ID, p.idleTime)
}
return false, ""
}
3. Compaction 机制深度解析
Compaction 是 DataCoord 的核心功能之一,用于优化数据存储和查询性能:
Compaction 类型
// pkg/proto/datapb/data_coord.proto
enum CompactionType {
UndefinedCompaction = 0;
MixCompaction = 1; // 混合压缩:合并小 Segment
L0Compaction = 2; // L0 压缩:合并 Delta 数据
ClusteringCompaction = 3; // 聚簇压缩:按主键排序
}
Compaction 类型对比:
┌─────────────────────────────────────────────────────────────┐
│ Compaction 类型对比 │
├──────────────┬──────────────┬──────────────┬────────────────┤
│ 类型 │ 目的 │ 触发条件 │ 效果 │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ Mix │ 合并小文件 │ 小 Segment │ 减少文件数 │
│ Compaction │ 减少碎片 │ 数量过多 │ 提升查询性能 │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ L0 │ 合并 Delta │ Delete 操作 │ 清理删除数据 │
│ Compaction │ 应用删除 │ 累积过多 │ 减少存储空间 │
├──────────────┼──────────────┼──────────────┼────────────────┤
│ Clustering │ 数据聚簇 │ 手动触发 │ 优化范围查询 │
│ Compaction │ 主键排序 │ 或定期执行 │ 提升过滤性能 │
└──────────────┴──────────────┴──────────────┴────────────────┘
Compaction 触发器实现
// internal/datacoord/compaction_trigger.go
type compactionTrigger struct {
handler Handler
meta *meta
allocator allocator.Allocator
// 信号通道
signals chan *compactionSignal // 自动触发信号
manualSignals chan *compactionSignal // 手动触发信号
// 检查器
inspector CompactionInspector
// 定时器
globalTrigger *time.Ticker
// 策略
estimateNonDiskSegmentPolicy calUpperLimitPolicy
estimateDiskSegmentPolicy calUpperLimitPolicy
}
// 启动 Compaction 触发器
func (t *compactionTrigger) start() {
// 1. 启动全局触发器(定期检查)
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.CompactionCheckInterval.GetAsDuration(time.Second))
// 2. 启动信号处理循环
go t.startSignalLoop()
// 3. 启动全局触发循环
go t.startGlobalTriggerLoop()
}
// 全局触发循环
func (t *compactionTrigger) startGlobalTriggerLoop() {
for {
select {
case <-t.closeCh:
return
case <-t.globalTrigger.C:
// 检查所有 Collection 的 Compaction 需求
t.triggerCompactionForAllCollections()
}
}
}
// 为所有 Collection 触发 Compaction
func (t *compactionTrigger) triggerCompactionForAllCollections() {
collections := t.meta.GetCollections()
for _, collectionID := range collections {
// 获取 Collection 的所有 Channel
channels := t.meta.GetChannelsByCollection(collectionID)
for _, channel := range channels {
// 检查是否需要 Compaction
if t.shouldTriggerCompaction(collectionID, channel) {
// 创建 Compaction 信号
signal := NewCompactionSignal().
WithCollectionID(collectionID).
WithChannel(channel).
WithWaitResult(false)
// 发送信号
select {
case t.signals <- signal:
case <-t.closeCh:
return
}
}
}
}
}
Compaction 任务执行
// internal/datacoord/compaction_task_mix.go
type mixCompactionTask struct {
*datapb.CompactionTask
meta *meta
allocator allocator.Allocator
handler Handler
// 任务状态
state datapb.CompactionTaskState
startTime time.Time
endTime time.Time
}
// 执行 Mix Compaction
func (t *mixCompactionTask) Process() error {
// 1. 选择要压缩的 Segment
segments, err := t.selectSegments()
if err != nil {
return err
}
// 2. 分配新的 Segment ID
compactedSegmentID, err := t.allocator.AllocOne()
if err != nil {
return err
}
// 3. 创建 Compaction 计划
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
Type: datapb.CompactionType_MixCompaction,
SegmentBinlogs: t.buildSegmentBinlogs(segments),
CompactedSegmentID: compactedSegmentID,
Channel: t.GetChannel(),
}
// 4. 分配给 DataNode 执行
nodeID, err := t.handler.AssignCompactionTask(plan)
if err != nil {
return err
}
// 5. 更新任务状态
t.state = datapb.CompactionTaskState_executing
t.NodeID = nodeID
log.Info("mix compaction task assigned",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("nodeID", nodeID),
zap.Int("segmentCount", len(segments)))
return nil
}
// 选择要压缩的 Segment
func (t *mixCompactionTask) selectSegments() ([]*SegmentInfo, error) {
// 1. 获取 Channel 的所有 Flushed Segment
segments := t.meta.GetSegmentsByChannel(t.GetChannel())
// 2. 过滤条件
candidates := make([]*SegmentInfo, 0)
for _, segment := range segments {
// 只选择 Flushed 状态的 Segment
if segment.State != commonpb.SegmentState_Flushed {
continue
}
// 跳过正在 Compaction 的 Segment
if segment.IsCompacting {
continue
}
// 跳过太大的 Segment
if segment.GetSize() > t.getMaxSegmentSize() {
continue
}
candidates = append(candidates, segment)
}
// 3. 按大小排序,优先合并小 Segment
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].GetSize() < candidates[j].GetSize()
})
// 4. 选择前 N 个 Segment
maxSegmentCount := Params.DataCoordCfg.CompactionMaxSegmentNum.GetAsInt()
if len(candidates) > maxSegmentCount {
candidates = candidates[:maxSegmentCount]
}
return candidates, nil
}
Compaction 执行流程图:
┌─────────────────────────────────────────────┐
│ CompactionTrigger 定期检查 │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 检查 Collection/Channel 是否需要压缩 │
│ - 小 Segment 数量 │
│ - Delta 数据量 │
│ - 上次压缩时间 │
└──────────────────┬──────────────────────────┘
│ 需要压缩
▼
┌─────────────────────────────────────────────┐
│ 创建 CompactionTask │
│ 1. 选择要压缩的 Segment │
│ 2. 分配新的 Segment ID │
│ 3. 构建 CompactionPlan │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ 分配给 DataNode 执行 │
│ 1. 选择负载最低的 DataNode │
│ 2. 发送 CompactionPlan │
│ 3. 更新任务状态为 executing │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ DataNode 执行压缩 │
│ 1. 读取源 Segment 数据 │
│ 2. 合并/过滤数据 │
│ 3. 写入新 Segment │
│ 4. 上传到对象存储 │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ DataNode 报告完成 │
│ CompactionResult │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ DataCoord 更新元数据 │
│ 1. 添加新 Segment │
│ 2. 标记旧 Segment 为 Dropped │
│ 3. 更新任务状态为 completed │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ GarbageCollector 清理旧数据 │
└─────────────────────────────────────────────┘
4. Index 构建调度
DataCoord 负责调度索引构建任务到 DataNode:
// internal/datacoord/index_service.go
type indexService struct {
meta *indexMeta
allocator allocator.Allocator
handler Handler
// 任务队列
taskQueue *indexTaskQueue
// 检查器
inspector *indexInspector
}
// 创建索引任务
func (s *indexService) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) error {
// 1. 分配 Index ID
indexID, err := s.allocator.AllocOne()
if err != nil {
return err
}
// 2. 获取需要构建索引的 Segment
segments := s.meta.GetSegmentsByCollection(req.CollectionID)
// 3. 为每个 Segment 创建索引任务
for _, segment := range segments {
// 跳过已有索引的 Segment
if s.meta.HasIndex(segment.ID, req.FieldID) {
continue
}
// 创建索引任务
task := &indexTask{
IndexID: indexID,
CollectionID: req.CollectionID,
PartitionID: segment.PartitionID,
SegmentID: segment.ID,
FieldID: req.FieldID,
IndexParams: req.IndexParams,
State: commonpb.IndexState_Unissued,
}
// 添加到任务队列
if err := s.taskQueue.Enqueue(task); err != nil {
return err
}
}
// 4. 持久化索引元数据
indexMeta := &model.Index{
IndexID: indexID,
CollectionID: req.CollectionID,
FieldID: req.FieldID,
IndexName: req.IndexName,
IndexParams: req.IndexParams,
CreateTime: time.Now().Unix(),
}
return s.meta.AddIndex(indexMeta)
}
// 调度索引任务
func (s *indexService) scheduleIndexTasks() {
// 1. 从队列获取待执行任务
tasks := s.taskQueue.Dequeue(10)
for _, task := range tasks {
// 2. 选择 DataNode
nodeID, err := s.selectDataNode(task)
if err != nil {
log.Warn("failed to select datanode", zap.Error(err))
continue
}
// 3. 分配任务
if err := s.handler.AssignIndexTask(nodeID, task); err != nil {
log.Warn("failed to assign index task", zap.Error(err))
continue
}
// 4. 更新任务状态
task.State = commonpb.IndexState_InProgress
task.NodeID = nodeID
s.meta.UpdateIndexTask(task)
log.Info("index task assigned",
zap.Int64("indexID", task.IndexID),
zap.Int64("segmentID", task.SegmentID),
zap.Int64("nodeID", nodeID))
}
}
5. 垃圾回收机制
GarbageCollector 负责清理过期的数据和元数据:
// internal/datacoord/garbage_collector.go
type garbageCollector struct {
meta *meta
handler Handler
// 配置
option GcOption
// 状态
closeCh chan struct{}
wg sync.WaitGroup
}
type GcOption struct {
// 扫描间隔
scanInterval time.Duration
// 保留时间
dropTolerance time.Duration // Dropped Segment 保留时间
missingTolerance time.Duration // 缺失 Segment 保留时间
// 是否启用
enabled bool
}
// 启动垃圾回收
func (gc *garbageCollector) start() {
if !gc.option.enabled {
return
}
gc.wg.Add(1)
go gc.recycleLoop()
}
// 回收循环
func (gc *garbageCollector) recycleLoop() {
defer gc.wg.Done()
ticker := time.NewTicker(gc.option.scanInterval)
defer ticker.Stop()
for {
select {
case <-gc.closeCh:
return
case <-ticker.C:
// 1. 回收 Dropped Segment
gc.recycleDroppedSegments()
// 2. 回收 Binlog 文件
gc.recycleBinlogs()
// 3. 回收索引文件
gc.recycleIndexFiles()
// 4. 回收元数据
gc.recycleMetadata()
}
}
}
// 回收 Dropped Segment
func (gc *garbageCollector) recycleDroppedSegments() {
// 1. 获取所有 Dropped 状态的 Segment
segments := gc.meta.GetSegmentsByState(commonpb.SegmentState_Dropped)
now := time.Now()
for _, segment := range segments {
// 2. 检查是否超过保留时间
dropTime := time.Unix(segment.DropTime, 0)
if now.Sub(dropTime) < gc.option.dropTolerance {
continue
}
// 3. 删除对象存储中的文件
if err := gc.removeSegmentFiles(segment); err != nil {
log.Warn("failed to remove segment files",
zap.Int64("segmentID", segment.ID),
zap.Error(err))
continue
}
// 4. 删除元数据
if err := gc.meta.RemoveSegment(segment.ID); err != nil {
log.Warn("failed to remove segment metadata",
zap.Int64("segmentID", segment.ID),
zap.Error(err))
continue
}
log.Info("segment recycled",
zap.Int64("segmentID", segment.ID),
zap.String("channel", segment.InsertChannel))
}
}
// 删除 Segment 文件
func (gc *garbageCollector) removeSegmentFiles(segment *SegmentInfo) error {
// 1. 删除 Binlog 文件
for _, binlog := range segment.GetBinlogs() {
for _, file := range binlog.GetBinlogs() {
if err := gc.handler.RemoveFile(file.GetLogPath()); err != nil {
return err
}
}
}
// 2. 删除 Deltalog 文件
for _, deltalog := range segment.GetDeltalogs() {
for _, file := range deltalog.GetBinlogs() {
if err := gc.handler.RemoveFile(file.GetLogPath()); err != nil {
return err
}
}
}
// 3. 删除 Statslog 文件
for _, statslog := range segment.GetStatslogs() {
for _, file := range statslog.GetBinlogs() {
if err := gc.handler.RemoveFile(file.GetLogPath()); err != nil {
return err
}
}
}
return nil
}
垃圾回收流程图:
┌─────────────────────────────────────────────┐
│ GarbageCollector 定期扫描 │
│ (默认每 1 小时) │
└──────────────────┬──────────────────────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Dropped │ │ Binlog │ │ Index │
│ Segment │ │ Files │ │ Files │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ 检查保留时间 │ │
▼ ▼ ▼
┌──────────────────────────────────────┐
│ 超过保留时间? │
│ - Dropped: 24 小时 │
│ - Missing: 1 小时 │
└────┬─────────────────────────────────┘
│ 是
▼
┌──────────────────────────────────────┐
│ 删除对象存储中的文件 │
│ 1. Binlog 文件 │
│ 2. Deltalog 文件 │
│ 3. Statslog 文件 │
│ 4. Index 文件 │
└────┬─────────────────────────────────┘
│
▼
┌──────────────────────────────────────┐
│ 删除 etcd 中的元数据 │
└──────────────────────────────────────┘
🔍 源码深度剖析
1. Channel 管理与负载均衡
DataCoord 负责管理 DML Channel 的分配和负载均衡:
// internal/datacoord/channel.go
type channelManager struct {
meta *meta
nodeManager session.NodeManager
// Channel 分配策略
policy ChannelAssignPolicy
// Channel 状态
channels map[string]*channelState
mu sync.RWMutex
}
type channelState struct {
Name string
NodeID int64
State datapb.ChannelWatchState
// 统计信息
SegmentCount int
DataSize int64
}
// 分配 Channel
func (cm *channelManager) AssignChannels(ctx context.Context, channels []string) error {
cm.mu.Lock()
defer cm.mu.Unlock()
// 1. 获取所有可用的 DataNode
nodes := cm.nodeManager.GetNodes()
if len(nodes) == 0 {
return errors.New("no available datanode")
}
// 2. 使用策略分配 Channel
assignments := cm.policy.Assign(channels, nodes)
// 3. 通知 DataNode 监听 Channel
for nodeID, channelList := range assignments {
req := &datapb.WatchChannelsRequest{
Channels: channelList,
}
if err := cm.nodeManager.WatchChannels(nodeID, req); err != nil {
return err
}
// 4. 更新 Channel 状态
for _, channel := range channelList {
cm.channels[channel] = &channelState{
Name: channel,
NodeID: nodeID,
State: datapb.ChannelWatchState_ToWatch,
}
}
}
return nil
}
// Channel 负载均衡
func (cm *channelManager) Balance(ctx context.Context) error {
cm.mu.Lock()
defer cm.mu.Unlock()
// 1. 计算每个节点的负载
nodeLoads := cm.calculateNodeLoads()
// 2. 找出负载最高和最低的节点
maxLoadNode, minLoadNode := cm.findExtremeNodes(nodeLoads)
// 3. 检查是否需要均衡
if !cm.shouldBalance(nodeLoads[maxLoadNode], nodeLoads[minLoadNode]) {
return nil
}
// 4. 选择要迁移的 Channel
channel := cm.selectChannelToMigrate(maxLoadNode)
// 5. 执行迁移
if err := cm.migrateChannel(channel, maxLoadNode, minLoadNode); err != nil {
return err
}
log.Info("channel balanced",
zap.String("channel", channel),
zap.Int64("fromNode", maxLoadNode),
zap.Int64("toNode", minLoadNode))
return nil
}
Channel 分配策略:
// 1. 轮询策略
type roundRobinPolicy struct {
index int
}
func (p *roundRobinPolicy) Assign(channels []string, nodes []int64) map[int64][]string {
result := make(map[int64][]string)
for _, channel := range channels {
nodeID := nodes[p.index%len(nodes)]
result[nodeID] = append(result[nodeID], channel)
p.index++
}
return result
}
// 2. 负载均衡策略
type loadBalancePolicy struct {
meta *meta
}
func (p *loadBalancePolicy) Assign(channels []string, nodes []int64) map[int64][]string {
// 计算每个节点的当前负载
nodeLoads := make(map[int64]int64)
for _, nodeID := range nodes {
nodeLoads[nodeID] = p.calculateNodeLoad(nodeID)
}
result := make(map[int64][]string)
for _, channel := range channels {
// 选择负载最低的节点
minLoadNode := nodes[0]
minLoad := nodeLoads[minLoadNode]
for _, nodeID := range nodes[1:] {
if nodeLoads[nodeID] < minLoad {
minLoadNode = nodeID
minLoad = nodeLoads[nodeID]
}
}
// 分配 Channel
result[minLoadNode] = append(result[minLoadNode], channel)
// 更新负载
nodeLoads[minLoadNode] += p.estimateChannelLoad(channel)
}
return result
}
2. 元数据管理
DataCoord 的元数据管理是其核心功能:
// internal/datacoord/meta.go
type meta struct {
// 元数据存储
catalog metastore.DataCoordCatalog
// 内存缓存
segments *SegmentsInfo // Segment 信息
collections map[UniqueID]*collectionInfo // Collection 信息
indexes map[UniqueID]*model.Index // Index 信息
// 并发控制
segmentLock sync.RWMutex
indexLock sync.RWMutex
}
type SegmentsInfo struct {
segments map[UniqueID]*SegmentInfo
// 索引
channel2Segments map[string][]UniqueID
state2Segments map[commonpb.SegmentState][]UniqueID
}
// 添加 Segment
func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
m.segmentLock.Lock()
defer m.segmentLock.Unlock()
// 1. 检查是否已存在
if _, ok := m.segments.segments[segment.ID]; ok {
return fmt.Errorf("segment %d already exists", segment.ID)
}
// 2. 持久化到 etcd
if err := m.catalog.AddSegment(ctx, segment.SegmentInfo); err != nil {
return err
}
// 3. 更新内存缓存
m.segments.segments[segment.ID] = segment
// 4. 更新索引
channel := segment.InsertChannel
m.segments.channel2Segments[channel] = append(
m.segments.channel2Segments[channel],
segment.ID,
)
state := segment.State
m.segments.state2Segments[state] = append(
m.segments.state2Segments[state],
segment.ID,
)
return nil
}
// 更新 Segment
func (m *meta) UpdateSegment(ctx context.Context, segment *SegmentInfo) error {
m.segmentLock.Lock()
defer m.segmentLock.Unlock()
// 1. 检查是否存在
old, ok := m.segments.segments[segment.ID]
if !ok {
return fmt.Errorf("segment %d not found", segment.ID)
}
// 2. 持久化到 etcd
if err := m.catalog.AlterSegment(ctx, segment.SegmentInfo); err != nil {
return err
}
// 3. 更新内存缓存
m.segments.segments[segment.ID] = segment
// 4. 更新状态索引(如果状态改变)
if old.State != segment.State {
// 从旧状态列表中移除
m.removeFromStateIndex(old.ID, old.State)
// 添加到新状态列表
m.segments.state2Segments[segment.State] = append(
m.segments.state2Segments[segment.State],
segment.ID,
)
}
return nil
}
3. 数据导入(Import)机制
DataCoord 支持批量数据导入功能,并利用 Go 的高并发特性高效调度:
// internal/datacoord/import_job.go
type ImportJob struct {
JobID int64
CollectionID int64
PartitionID int64
// 导入文件
Files []string
// 任务列表
Tasks []*ImportTask
// 状态
State datapb.ImportJobState
Reason string
Progress int32
// 时间
CreateTime time.Time
StartTime time.Time
EndTime time.Time
}
// 创建导入任务
func (s *Server) Import(ctx context.Context, req *datapb.ImportRequest) (*datapb.ImportResponse, error) {
// 1. 分配 Job ID
jobID, err := s.idAllocator.AllocOne()
if err != nil {
return nil, err
}
// 2. 创建 Import Job
job := &ImportJob{
JobID: jobID,
CollectionID: req.CollectionID,
PartitionID: req.PartitionID,
Files: req.Files,
State: datapb.ImportJobState_Pending,
CreateTime: time.Now(),
}
// 3. 拆分为多个 Import Task
tasks, err := s.splitImportTasks(job)
if err != nil {
return nil, err
}
job.Tasks = tasks
// 4. 保存到元数据
if err := s.importMeta.AddJob(job); err != nil {
return nil, err
}
// 5. 提交任务到队列
for _, task := range tasks {
if err := s.importChecker.AddTask(task); err != nil {
return nil, err
}
}
return &datapb.ImportResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
JobID: jobID,
}, nil
}
// 拆分导入任务
func (s *Server) splitImportTasks(job *ImportJob) ([]*ImportTask, error) {
tasks := make([]*ImportTask, 0)
// 每个文件创建一个 PreImport 任务
for _, file := range job.Files {
taskID, err := s.idAllocator.AllocOne()
if err != nil {
return nil, err
}
task := &ImportTask{
TaskID: taskID,
JobID: job.JobID,
CollectionID: job.CollectionID,
PartitionID: job.PartitionID,
Files: []string{file},
Type: datapb.ImportTaskType_PreImport,
State: datapb.ImportTaskState_Pending,
}
tasks = append(tasks, task)
}
return tasks, nil
}
// Import Inspector 检查和调度任务
func (ii *importInspector) Start() {
go ii.scheduleLoop()
}
func (ii *importInspector) scheduleLoop() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ii.closeCh:
return
case <-ticker.C:
// 1. 检查 Pending 任务
ii.schedulePendingTasks()
// 2. 检查 InProgress 任务
ii.checkInProgressTasks()
// 3. 检查 Completed 任务
ii.processCompletedTasks()
}
}
}
func (ii *importInspector) schedulePendingTasks() {
// 获取所有 Pending 任务
tasks := ii.meta.GetTasksByState(datapb.ImportTaskState_Pending)
for _, task := range tasks {
// 选择 DataNode
nodeID, err := ii.selectDataNode(task)
if err != nil {
log.Warn("failed to select datanode", zap.Error(err))
continue
}
// 分配任务
if err := ii.assignTask(nodeID, task); err != nil {
log.Warn("failed to assign import task", zap.Error(err))
continue
}
// 更新任务状态
task.State = datapb.ImportTaskState_InProgress
task.NodeID = nodeID
ii.meta.UpdateTask(task)
log.Info("import task assigned",
zap.Int64("taskID", task.TaskID),
zap.Int64("nodeID", nodeID))
}
}
Import 流程图:
┌─────────────────────────────────────────────┐
│ Client 提交 Import 请求 │
│ - Collection ID │
│ - 文件列表(S3/MinIO 路径) │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ DataCoord 创建 Import Job │
│ 1. 分配 Job ID │
│ 2. 拆分为多个 PreImport Task │
│ 3. 保存元数据 │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ ImportInspector 调度 PreImport Task │
│ 1. 选择 DataNode │
│ 2. 分配任务 │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ DataNode 执行 PreImport │
│ 1. 读取文件头信息 │
│ 2. 验证 Schema │
│ 3. 统计数据量 │
│ 4. 返回统计信息 │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ DataCoord 创建 Import Task │
│ 根据数据量拆分为多个 Import Task │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ DataNode 执行 Import │
│ 1. 读取数据文件 │
│ 2. 转换为 Milvus 格式 │
│ 3. 写入 Segment │
│ 4. 上传到对象存储 │
└──────────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────────┐
│ DataCoord 更新元数据 │
│ 1. 添加新 Segment │
│ 2. 更新 Job 进度 │
│ 3. 标记任务完成 │
└─────────────────────────────────────────────┘
🛠️ 调试与实验
1. 如何调试 DataCoord
方法 1:查看 Segment 信息
# 使用 Birdwatcher 查看 Segment
birdwatcher> connect --etcd localhost:2379
# 查看所有 Segment
birdwatcher> show segment
# 查看特定 Collection 的 Segment
birdwatcher> show segment --collection 123456
# 查看 Segment 详细信息
birdwatcher> show segment-info --segment 789012
方法 2:监控 Compaction
# 查看 Compaction 任务
birdwatcher> show compaction-task
# 查看 Compaction 统计
curl http://localhost:9091/metrics | grep datacoord_compaction
# 关键指标:
# - datacoord_compaction_task_num: 任务数量
# - datacoord_compaction_latency: 延迟
# - datacoord_compaction_data_size: 数据量
方法 3:调试 Segment 分配
# debug_segment_allocation.py
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import time
# 连接 Milvus
connections.connect(host="localhost", port="19530")
# 创建 Collection
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128)
]
schema = CollectionSchema(fields=fields)
collection = Collection(name="test_segment", schema=schema)
# 持续插入数据,观察 Segment 分配
for i in range(100):
data = [
[j for j in range(i*1000, (i+1)*1000)],
[[0.1]*128 for _ in range(1000)]
]
collection.insert(data)
# 刷盘
collection.flush()
# 查看 Segment 数量
print(f"Round {i}: {collection.num_entities} entities")
time.sleep(1)
观察 DataCoord 日志:
# 查看 DataCoord 日志
kubectl logs -f milvus-datacoord-xxx -n milvus
# 关键日志:
[INFO] [datacoord/segment_manager.go:xxx] segment allocated
segmentID: 446402150400012346
channel: by-dev-rootcoord-dml_0
numRows: 1000
[INFO] [datacoord/segment_manager.go:xxx] segment sealed
segmentID: 446402150400012346
reason: reach max row num
[INFO] [datacoord/compaction_trigger.go:xxx] compaction triggered
collectionID: 446402150400012345
channel: by-dev-rootcoord-dml_0
segmentCount: 5
2. 实验:观察 Compaction 过程
# experiment_compaction.py
from pymilvus import connections, Collection, utility
import time
connections.connect(host="localhost", port="19530")
# 创建 Collection
collection = Collection("test_compaction")
# 1. 插入大量小批次数据(产生很多小 Segment)
print("Inserting data to create small segments...")
for i in range(50):
data = [
[j for j in range(i*100, (i+1)*100)],
[[0.1]*128 for _ in range(100)]
]
collection.insert(data)
collection.flush()
time.sleep(0.1)
# 2. 查看 Segment 数量
stats = collection.get_stats()
print(f"Segment count before compaction: {stats}")
# 3. 手动触发 Compaction
print("Triggering compaction...")
utility.do_compact(collection.name)
# 4. 等待 Compaction 完成
time.sleep(30)
# 5. 查看 Compaction 后的 Segment 数量
stats = collection.get_stats()
print(f"Segment count after compaction: {stats}")
3. 常见问题排查
问题 1:Segment 分配失败
现象:
Error: failed to allocate segment
排查步骤:
# 1. 检查 DataNode 状态
birdwatcher> show session --role datanode
# 2. 检查 Channel 分配
birdwatcher> show channel-watch
# 3. 检查 Segment 配额
# 查看是否达到最大 Segment 数量限制
解决方案:
# 调整配置
dataCoord:
segment:
maxSize: 1024 # 增加 Segment 最大大小
sealProportion: 0.75 # 调整封存比例
问题 2:Compaction 不执行
现象:
小 Segment 一直不合并
排查步骤:
# 1. 检查 Compaction 配置
curl http://localhost:9091/api/v1/config | grep compaction
# 2. 检查 Compaction 触发器状态
birdwatcher> show compaction-trigger
# 3. 查看 DataNode 负载
birdwatcher> show datanode-load
解决方案:
# 调整 Compaction 配置
dataCoord:
compaction:
enableAutoCompaction: true
minSegmentToMerge: 3 # 降低最小合并数量
maxSegmentToMerge: 10
问题 3:Index 构建慢
现象:
索引构建任务长时间不完成
排查步骤:
# 1. 检查 Index 任务状态
birdwatcher> show index-task
# 2. 检查 DataNode 资源
kubectl top pod -n milvus | grep datanode
# 3. 查看 Index 构建日志
kubectl logs -f milvus-datanode-xxx -n milvus | grep index
解决方案:
# 增加 DataNode 资源
dataNode:
resources:
limits:
cpu: "8"
memory: "16Gi"
# 增加并发数
indexBuildParallel: 4
📚 扩展阅读
1. 相关设计文档
- DataCoord Design Doc
- Segment Management
- Compaction Design
- Import Design
2. 相关论文
- LSM-Tree: The Log-Structured Merge-Tree
- Bigtable: A Distributed Storage System for Structured Data
- Google 的分布式存储系统,Segment 管理的灵感来源
- RocksDB: A Persistent Key-Value Store
- LSM-Tree 的实现,Compaction 策略参考
3. 源码阅读建议
推荐阅读顺序:
- 入门(1-2 天)
server.go - 理解整体结构
segment_manager.go - 理解 Segment 管理
meta.go - 理解元数据管理
- 进阶(3-5 天)
compaction_trigger.go - 理解 Compaction 触发
compaction_task_*.go - 理解 Compaction 任务
index_service.go - 理解索引服务
- 高级(1 周)
channel.go - 理解 Channel 管理
garbage_collector.go - 理解垃圾回收
import_*.go - 理解数据导入
调试技巧:
// 1. 在关键路径添加断点
// segment_manager.go:AllocSegment
// compaction_trigger.go:TriggerCompaction
// index_service.go:CreateIndex
// 2. 使用条件断点
// 只在特定 Collection 时触发
if req.CollectionID == 123456 {
// 断点位置
}
// 3. 观察变量
// - segment.State
// - compactionTask.State
// - indexTask.Progress
4. 性能优化建议
Segment 管理优化
# 1. 调整 Segment 大小
dataCoord:
segment:
maxSize: 1024 # MB,根据数据特点调整
sealProportion: 0.75 # 封存比例
# 2. 调整封存策略
dataCoord:
segment:
maxLifetime: 86400 # 秒,最大生命周期
maxIdleTime: 3600 # 秒,最大空闲时间
Compaction 优化
# 1. 调整 Compaction 触发条件
dataCoord:
compaction:
enableAutoCompaction: true
minSegmentToMerge: 3
maxSegmentToMerge: 10
# 2. 调整 Compaction 间隔
dataCoord:
compaction:
checkInterval: 60 # 秒
# 3. 启用 Clustering Compaction
dataCoord:
compaction:
enableClustering: true
clusteringInterval: 3600 # 秒
Index 构建优化
# 1. 增加并发数
dataNode:
indexBuildParallel: 4
# 2. 调整资源限制
dataNode:
resources:
limits:
cpu: "8"
memory: "16Gi"
# 3. 使用 GPU 加速
dataNode:
enableGPU: true
gpuMemoryRatio: 0.5
5. 贡献代码指南
常见贡献方向:
- Compaction 优化
- 改进 Compaction 策略
- 优化 Compaction 性能
- 支持新的 Compaction 类型
- Segment 管理
- 优化 Segment 分配算法
- 改进封存策略
- 支持更灵活的配置
- Index 服务
- 支持新的索引类型
- 优化索引构建调度
- 改进索引进度跟踪
提交 PR 流程:
# 1. Fork 仓库
git clone https://github.com/your-username/milvus.git
# 2. 创建分支
git checkout -b feature/improve-datacoord
# 3. 修改代码
# 编辑 internal/datacoord/xxx.go
# 4. 添加测试
# 编辑 internal/datacoord/xxx_test.go
# 5. 运行测试
cd internal/datacoord
go test -v
# 6. 提交代码
git commit -m "feat: improve DataCoord compaction"
git push origin feature/improve-datacoord
# 7. 创建 Pull Request
💡 总结
核心要点回顾
- DataCoord 是 Milvus 的数据管家
- 管理 Segment 的完整生命周期
- 调度 Compaction 和 Index 构建
- 负责 Channel 分配和负载均衡
- 执行垃圾回收清理过期数据
- 四大核心组件
- SegmentManager:Segment 分配、封存、删除
- CompactionTrigger:定期触发 Compaction 任务
- IndexService:管理索引构建任务
- GarbageCollector:清理过期数据和元数据
- Segment 生命周期
- New → Growing → Sealed → Flushed → Compacted → Dropped
- 多种封存策略:容量、生命周期、空闲时间
- 自动刷盘和压缩机制
- Compaction 机制
- Mix Compaction:合并小 Segment,减少文件数
- L0 Compaction:合并 Delta 数据,应用删除操作
- Clustering Compaction:数据聚簇,优化范围查询
- 关键设计模式
- 策略模式:灵活的 Segment 封存和分配策略
- 观察者模式:Inspector 监控任务状态
- 生产者-消费者:任务队列和调度
- 状态机:Segment 和任务的状态转换
最佳实践建议
- Segment 管理
- 根据数据特点调整 Segment 大小
- 合理设置封存策略
- 监控 Segment 数量和大小分布
- Compaction 优化
- 启用自动 Compaction
- 根据负载调整触发间隔
- 定期执行 Clustering Compaction
- Index 构建
- 合理分配 DataNode 资源
- 调整并发数提高吞吐量
- 使用 GPU 加速索引构建
- 监控和调试
- 使用 Birdwatcher 查看元数据
- 监控 Prometheus 指标
- 定期检查云原生架构下的垃圾回收效果