找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

221

积分

0

好友

31

主题
发表于 昨天 00:32 | 查看: 2| 回复: 0

一、Milvus 数据模型概览

1.1 层次结构

Milvus 的数据组织采用四层结构,类似关系数据库但又有所不同:

Database(数据库)  
└── Collection(集合)       
└── Partition(分区)            
└── Segment(段)                 
└── 实际数据(向量 + 标量字段)

与关系数据库对比

Milvus 关系数据库 说明
Database Database 逻辑隔离单元
Collection Table 数据表
Partition Partition 分区
Segment Page/Block 物理存储单元
Field Column 字段/列
Entity Row 数据行

1.2 核心概念

Database(数据库)

Milvus 2.2+ 支持多数据库,实现租户隔离:

from pymilvus import connections, db

# 连接 Milvus
connections.connect(host="localhost", port="19530")

# 创建数据库
db.create_database("my_database")

# 列出所有数据库
databases = db.list_database()
print(databases)  # ['default', 'my_database']

# 使用数据库
db.using_database("my_database")

使用场景

  • 多租户隔离
  • 开发/测试/生产环境分离
  • 不同业务线数据隔离
Collection(集合)

集合是 Milvus 中最重要的概念,类似关系数据库的表:

from pymilvus import Collection, FieldSchema, CollectionSchema, DataType

# 定义 Schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000),
    FieldSchema(name="score", dtype=DataType.FLOAT),
    FieldSchema(name="metadata", dtype=DataType.JSON)  # 动态字段
]

schema = CollectionSchema(
    fields=fields,
    description="Document collection",
    enable_dynamic_field=True  # 支持动态字段
)

# 创建集合
collection = Collection(
    name="documents",
    schema=schema,
    shards_num=2  # 分片数(虚拟通道数)
)

Collection 的关键属性

  1. Schema:定义字段类型和约束
  2. Shards:虚拟通道数,影响写入并发度
  3. Consistency Level:一致性级别
  4. TTL:数据过期时间(可选)
Partition(分区)

分区是集合的逻辑划分,用于提高查询效率:

# 创建分区
collection.create_partition("partition_2024")
collection.create_partition("partition_2023")

# 插入数据到指定分区
collection.insert(data, partition_name="partition_2024")

# 查询指定分区
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param={"metric_type": "COSINE", "params": {"ef": 64}},
    limit=10,
    partition_names=["partition_2024"]  # 只搜索 2024 年的数据
)

分区策略

  1. 按时间分区:适合日志、事件数据
    # 按月分区
    partition_names = ["2024_01", "2024_02", "2024_03"]
  2. 按业务分区:适合多业务场景
    # 按类别分区
    partition_names = ["tech", "finance", "healthcare"]
  3. 按分区键自动分区(Milvus 2.2+):
    # 定义分区键
    FieldSchema(
       name="user_id",
       dtype=DataType.INT64,
       is_partition_key=True  # 自动按 user_id 分区
    )
Segment(段)

Segment 是 Milvus 数据存储的最小单位,类似 LSM-Tree 的 SSTable:

Segment 的生命周期:
Growing Segment(增长段)
  ↓ 达到阈值(512MB)
Sealed Segment(封闭段)
  ↓ 构建索引
Indexed Segment(索引段)
  ↓ 压缩合并
Compacted Segment(压缩段)

Segment 类型

类型 状态 说明 可查询
Growing 增长中 接收新数据
Sealed 已封闭 不再接收数据
Flushed 已刷盘 持久化到对象存储
Indexed 已索引 构建了向量索引
Dropped 已删除 等待垃圾回收

二、数据写入流程

2.1 完整的写入链路

跟踪一条数据从客户端到持久化的完整旅程:

第 1 步:客户端发起插入
┌─────────────┐
│   Client    │ insert(data)
└──────┬──────┘
       │
第 2 步:Proxy 预处理
┌──────▼──────┐
│    Proxy    │ 1. 验证 Schema
└──────┬──────┘ 2. 分配主键(如果 auto_id)
       │       3. 分配时间戳
       │       4. 数据分片(按主键哈希)
       │
第 3 步:写入 WAL(消息队列)
┌──────▼──────┐
│ Pulsar/Kafka│ 持久化日志,保证不丢失
└──────┬──────┘
       │
       ├────────────────────┬────────────────────┐
       │                    │                    │
第 4 步:DataNode 消费      第 5 步:QueryNode 消费
┌──────▼──────┐        ┌───▼────┐
│  DataNode   │        │QueryNode│
└──────┬──────┘        └───┬────┘
       │                   │
       │ 1. 批量聚合       │ 1. 更新内存索引
       │ 2. 构建 Segment   │ 2. 实时可查询
       │ 3. 持久化 Binlog  │
       │                   │
第 6 步:存储到对象存储
┌──────▼──────┐
│  MinIO/S3   │ 永久存储
└─────────────┘

2.2 详细代码流程

步骤 1:客户端插入
from pymilvus import Collection

collection = Collection("documents")

# 准备数据
data = [
    {
        "embedding": [0.1] * 768,
        "text": "Milvus is a vector database",
        "score": 0.95
    },
    {
        "embedding": [0.2] * 768,
        "text": "Vector search is fast",
        "score": 0.88
    }
]

# 插入数据
result = collection.insert(data)
print(f"Inserted {result.insert_count} entities")
print(f"Primary keys: {result.primary_keys}")
步骤 2:Proxy 处理
// Proxy 的插入处理(简化版)
func (p *Proxy) Insert(ctx context.Context, req *InsertRequest) (*InsertResponse, error) {
    // 1. 获取集合信息
    collectionInfo := p.getCollectionInfo(req.CollectionName)

    // 2. 验证 Schema
    if err := validateSchema(req.Data, collectionInfo.Schema); err != nil {
        return nil, err
    }

    // 3. 分配主键(如果 auto_id)
    if collectionInfo.AutoID {
        req.IDs = p.idAllocator.AllocN(len(req.Data))
    }

    // 4. 分配时间戳
    ts, err := p.tsoAllocator.AllocOne()
    if err != nil {
        return nil, err
    }
    req.Timestamp = ts

    // 5. 数据分片(按主键哈希到不同的虚拟通道)
    shardData := p.hashToShards(req.Data, collectionInfo.ShardNum)

    // 6. 写入消息队列
    for shardID, data := range shardData {
        msg := &InsertMsg{
            CollectionID: collectionInfo.ID,
            PartitionID:  req.PartitionID,
            ShardID:      shardID,
            Data:         data,
            Timestamp:    ts,
        }
        p.msgStream.Produce(shardID, msg)
    }

    return &InsertResponse{
        InsertCount: len(req.Data),
        IDs:         req.IDs,
    }, nil
}
步骤 3:DataNode 消费 WAL
// DataNode 的数据处理
type DataNode struct {
    insertBuffer *InsertBuffer  // 内存缓冲区
    segments     map[int64]*Segment
}

func (dn *DataNode) consumeWAL() {
    for msg := range dn.msgStream.Chan() {
        insertMsg := msg.(*InsertMsg)

        // 1. 写入内存缓冲区
        dn.insertBuffer.Add(insertMsg)

        // 2. 检查是否需要刷盘
        if dn.insertBuffer.Size() >= FlushThreshold {
            dn.flush()
        }
    }
}

func (dn *DataNode) flush() {
    // 1. 从缓冲区取出数据
    data := dn.insertBuffer.Drain()

    // 2. 按 Segment 组织数据
    segmentData := dn.groupBySegment(data)

    for segmentID, data := range segmentData {
        // 3. 序列化为 Binlog
        binlog := dn.serializeToBinlog(data)

        // 4. 上传到对象存储
        path := fmt.Sprintf("binlog/%d/%d/%d",
            data.CollectionID,
            data.PartitionID,
            segmentID)
        dn.objectStorage.Put(path, binlog)

        // 5. 通知 DataCoord
        dn.dataCoord.SaveBinlogPaths(&SaveBinlogPathsRequest{
            SegmentID:   segmentID,
            BinlogPaths: []string{path},
            NumRows:     len(data.Rows),
        })
    }
}

2.3 Segment 管理

Growing Segment
type GrowingSegment struct {
    ID           int64
    CollectionID int64
    PartitionID  int64

    // 内存中的数据
    insertBuffer *InsertBuffer
    deleteBuffer *DeleteBuffer

    // 统计信息
    numRows      int64
    memorySize   int64

    // 状态
    state        SegmentState  // Growing, Sealed, Flushed
}

// 插入数据
func (s *GrowingSegment) Insert(data *InsertData) error {
    s.insertBuffer.Add(data)
    s.numRows += int64(len(data.Rows))
    s.memorySize += data.Size()

    // 检查是否需要封闭
    if s.memorySize >= MaxSegmentSize {  // 默认 512MB
        s.Seal()
    }

    return nil
}

// 封闭 Segment
func (s *GrowingSegment) Seal() {
    s.state = SegmentState_Sealed
    // 通知 DataCoord 进行刷盘
    notifyDataCoord(s.ID)
}
Segment 压缩

Milvus 会定期压缩小 Segment,提高查询效率:

// DataCoord 的压缩策略
type CompactionStrategy struct {
    // 小 Segment 合并
    MinSegmentSize int64  // 小于 128MB 的 Segment
    MaxSegmentSize int64  // 合并后不超过 512MB

    // 删除数据压缩
    DeleteRatio float64  // 删除比例超过 20% 触发压缩
}

func (dc *DataCoord) triggerCompaction() {
    // 1. 找到需要压缩的 Segment
    segments := dc.findCompactionCandidates()

    // 2. 分组(同一个 Partition 的 Segment)
    groups := dc.groupSegments(segments)

    // 3. 创建压缩任务
    for _, group := range groups {
        task := &CompactionTask{
            Type:     MixCompaction,  // 或 MergeCompaction
            Segments: group,
        }
        dc.compactionQueue.Push(task)
    }
}

三、列式存储与 Binlog

3.1 为什么使用列式存储?

向量数据库的特点:

  • 宽表:每行有多个字段(向量 + 标量)
  • 列查询:通常只查询部分字段
  • 压缩友好:同类型数据压缩率高

行式 vs 列式

行式存储(传统数据库):
Row1: [id=1, vector=[0.1,0.2,...], text="hello", score=0.9]
Row2: [id=2, vector=[0.3,0.4,...], text="world", score=0.8]
Row3: [id=3, vector=[0.5,0.6,...], text="milvus", score=0.95]

列式存储(Milvus):
Column_id:     [1, 2, 3]
Column_vector: [[0.1,0.2,...], [0.3,0.4,...], [0.5,0.6,...]]
Column_text:   ["hello", "world", "milvus"]
Column_score:  [0.9, 0.8, 0.95]

优势

  • ✅ 只读取需要的列,减少 I/O
  • ✅ 同类型数据压缩率高(10-100 倍)
  • ✅ SIMD 优化友好
  • ✅ 支持列级索引

3.2 Binlog 格式

Milvus 使用自定义的 Binlog 格式存储数据:

Binlog 文件结构:
┌─────────────────────────────────────┐
│         Magic Number (4 bytes)      │  文件标识
├─────────────────────────────────────┤
│         Descriptor (变长)            │  元数据(Schema、压缩算法等)
├─────────────────────────────────────┤
│         Event Header (固定)          │  事件头(时间戳、类型等)
├─────────────────────────────────────┤
│         Event Data (变长)            │  实际数据(压缩后)
├─────────────────────────────────────┤
│         ...更多 Event...            │
├─────────────────────────────────────┤
│         Footer (固定)                │  文件尾(校验和等)
└─────────────────────────────────────┘

Binlog 类型

  1. Insert Binlog:插入数据 files/insert_log/{collection_id}/{partition_id}/{segment_id}/{field_id}/
  2. Delete Binlog:删除数据 files/delta_log/{collection_id}/{partition_id}/{segment_id}/
  3. Stats Binlog:统计信息 files/stats_log/{collection_id}/{partition_id}/{segment_id}/{field_id}/

3.3 数据压缩

Milvus 支持多种压缩算法:

type CompressionType int

const (
    NoCompression CompressionType = iota
    LZ4           // 快速压缩,压缩率中等
    Zstd          // 高压缩率,速度较慢
    Snappy        // 平衡方案
)

// 压缩数据
func compress(data []byte, compressionType CompressionType) []byte {
    switch compressionType {
    case LZ4:
        return lz4.Compress(data)
    case Zstd:
        return zstd.Compress(data)
    case Snappy:
        return snappy.Compress(data)
    default:
        return data
    }
}

压缩效果

数据类型 原始大小 LZ4 Zstd 压缩率
Float 向量 3072 字节 2800 字节 2500 字节 1.2x
Int64 ID 8 字节 6 字节 5 字节 1.6x
文本字段 1000 字节 400 字节 300 字节 3.3x

四、对象存储集成

4.1 为什么选择对象存储?

传统存储 vs 对象存储

特性 本地磁盘 对象存储
容量 有限(TB 级) 无限(PB 级)
成本 低(1/10)
可靠性 需要 RAID 11 个 9(99.999999999%)
扩展性 困难 自动扩展
访问速度 较慢

Milvus 的策略

  • 热数据:内存 + SSD(快速访问)
  • 冷数据:对象存储(低成本)

4.2 支持的对象存储

# MinIO(开源,本地部署)
minio:
  address: localhost:9000
  bucketName: milvus-bucket
  accessKeyID: minioadmin
  secretAccessKey: minioadmin

# AWS S3
s3:
  endpoint: s3.amazonaws.com
  bucketName: my-milvus-bucket
  region: us-west-2
  accessKeyID: ${AWS_ACCESS_KEY}
  secretAccessKey: ${AWS_SECRET_KEY}

# 阿里云 OSS
oss:
  endpoint: oss-cn-hangzhou.aliyuncs.com
  bucketName: my-milvus-bucket
  accessKeyID: ${OSS_ACCESS_KEY}
  secretAccessKey: ${OSS_SECRET_KEY}

4.3 数据布局

对象存储目录结构:
milvus-bucket/
├── files/
│   ├── insert_log/
│   │   └── {collection_id}/
│   │       └── {partition_id}/
│   │           └── {segment_id}/
│   │               ├── {field_id}/
│   │               │   └── {log_id}  # Insert Binlog
│   │               └── ...
│   ├── delta_log/
│   │   └── {collection_id}/
│   │       └── {partition_id}/
│   │           └── {segment_id}/
│   │               └── {log_id}  # Delete Binlog
│   └── index/
│       └── {collection_id}/
│           └── {partition_id}/
│               └── {segment_id}/
│                   └── {field_id}/
│                       └── {index_id}  # 索引文件
└── meta/
    └── ...  # 元数据快照

4.4 数据读取优化

预取(Prefetch)
// QueryNode 的数据预取
type DataLoader struct {
    cache      *LRUCache
    prefetcher *Prefetcher
}

func (dl *DataLoader) LoadSegment(segmentID int64) (*Segment, error) {
    // 1. 检查缓存
    if seg, ok := dl.cache.Get(segmentID); ok {
        return seg, nil
    }

    // 2. 从对象存储加载
    binlogPaths := dl.getBinlogPaths(segmentID)

    // 3. 并行加载多个字段
    var wg sync.WaitGroup
    fields := make(map[int64]*FieldData)

    for fieldID, path := range binlogPaths {
        wg.Add(1)
        go func(fid int64, p string) {
            defer wg.Done()
            data := dl.loadFromStorage(p)
            fields[fid] = data
        }(fieldID, path)
    }

    wg.Wait()

    // 4. 构建 Segment
    segment := NewSegment(segmentID, fields)

    // 5. 加入缓存
    dl.cache.Put(segmentID, segment)

    // 6. 预取相邻 Segment
    dl.prefetcher.Prefetch(segmentID + 1)

    return segment, nil
}
MMap(内存映射)

Milvus 2.3+ 支持 MMap,降低内存占用:

// 使用 MMap 加载数据
func (qn *QueryNode) loadSegmentWithMMap(segmentID int64) error {
    binlogPath := qn.getBinlogPath(segmentID)

    // 1. 下载到本地磁盘
    localPath := qn.downloadToLocal(binlogPath)

    // 2. MMap 映射到虚拟内存
    file, err := os.Open(localPath)
    if err != nil {
        return err
    }

    data, err := syscall.Mmap(
        int(file.Fd()),
        0,
        fileSize,
        syscall.PROT_READ,
        syscall.MAP_SHARED,
    )
    if err != nil {
        return err
    }

    // 3. 按需加载(操作系统自动管理)
    segment := NewSegmentFromMMap(segmentID, data)
    qn.segments[segmentID] = segment

    return nil
}

MMap 优势

  • 内存占用降低 50-70%
  • 操作系统自动管理页面换入换出
  • 适合超大数据集

五、数据删除与更新

5.1 删除机制

Milvus 使用标记删除(Soft Delete):

# 删除数据
collection.delete(expr="id in [1, 2, 3]")

# 或者按条件删除
collection.delete(expr="score < 0.5")

删除流程

1. Proxy 接收删除请求
   ↓
2. 写入 Delete Binlog(Delta Log)
   ↓
3. QueryNode 更新 Bloom Filter
   ↓
4. 查询时过滤已删除数据
   ↓
5. 压缩时物理删除

Delete Binlog 结构

type DeleteLog struct {
    PrimaryKeys []int64   // 被删除的主键
    Timestamps  []uint64  // 删除时间戳
}

// QueryNode 应用删除
func (qn *QueryNode) applyDelete(deleteLog *DeleteLog) {
    for i, pk := range deleteLog.PrimaryKeys {
        ts := deleteLog.Timestamps[i]

        // 更新 Bloom Filter
        qn.bloomFilter.Add(pk)

        // 记录删除时间戳
        qn.deletedPKs[pk] = ts
    }
}

// 查询时过滤
func (qn *QueryNode) search(query *SearchRequest) *SearchResult {
    results := qn.vectorSearch(query)

    // 过滤已删除的数据
    filtered := make([]*SearchResult, 0)
    for _, result := range results {
        if !qn.isDeleted(result.PK, query.Timestamp) {
            filtered = append(filtered, result)
        }
    }

    return filtered
}

5.2 更新机制

Milvus 不支持直接更新,需要先删除再插入:

# 更新数据(先删后插)
def update_entity(collection, pk, new_data):
    # 1. 删除旧数据
    collection.delete(expr=f"id == {pk}")

    # 2. 插入新数据
    new_data["id"] = pk
    collection.insert([new_data])

# 使用示例
update_entity(
    collection,
    pk=123,
    new_data={
        "embedding": [0.5] * 768,
        "text": "Updated text",
        "score": 0.99
    }
)

为什么不支持原地更新?

  • 向量索引难以高效更新
  • 列式存储不适合随机写
  • 删除+插入更简单可靠

六、数据一致性保证

6.1 WAL(Write-Ahead Log)

所有写操作先写 WAL,保证数据不丢失:

写入流程:
1. 数据写入 WAL(Pulsar/Kafka)✅ 持久化
2. 返回成功给客户端
3. 异步刷盘到对象存储

故障恢复:
1. 从 WAL 重放未刷盘的数据
2. 重建内存状态
3. 继续提供服务

6.2 时间戳与可见性

// 每条数据都有时间戳
type Entity struct {
    PK        int64
    Data      map[string]interface{}
    Timestamp uint64  // 插入时间戳
}

// 查询时指定时间戳
type SearchRequest struct {
    Vector    []float32
    TopK      int
    Timestamp uint64  // 查询时间戳(保证一致性)
}

// 只返回时间戳 <= 查询时间戳的数据
func (qn *QueryNode) search(req *SearchRequest) []*Entity {
    allResults := qn.vectorSearch(req.Vector, req.TopK)

    // 过滤未来数据
    visible := make([]*Entity, 0)
    for _, entity := range allResults {
        if entity.Timestamp <= req.Timestamp {
            visible = append(visible, entity)
        }
    }

    return visible
}

6.3 一致性级别

from pymilvus import Collection

collection = Collection("documents")

# 强一致性:保证读到最新数据
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param={"metric_type": "COSINE"},
    limit=10,
    consistency_level="Strong"  # 强一致性
)

# 最终一致性:更高性能
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param={"metric_type": "COSINE"},
    limit=10,
    consistency_level="Eventually"  # 最终一致性
)

# 会话一致性:同一会话内一致
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param={"metric_type": "COSINE"},
    limit=10,
    consistency_level="Session"  # 会话一致性
)

七、实战:数据生命周期管理

7.1 数据过期(TTL)

from pymilvus import Collection, CollectionSchema

# 创建带 TTL 的集合
schema = CollectionSchema(
    fields=[...],
    description="Logs with TTL"
)

collection = Collection(
    name="logs",
    schema=schema,
    properties={
        "collection.ttl.seconds": 86400  # 1 天后自动删除
    }
)

7.2 数据压缩策略

# 手动触发压缩
collection.compact()

# 等待压缩完成
collection.wait_for_compaction_completed()

# 查看压缩状态
state = collection.get_compaction_state()
print(f"Compaction state: {state}")

7.3 数据备份

# 导出数据
from pymilvus import utility

# 创建备份
utility.create_alias(
    collection_name="documents",
    alias="documents_backup_20241125"
)

# 导出到文件
collection.query(
    expr="",  # 导出所有数据
    output_fields=["*"]
)

八、性能优化建议

8.1 Segment 大小优化

# 调整 Segment 大小
collection = Collection(
    name="large_collection",
    schema=schema,
    properties={
        "segment.maxSize": 1024,  # 1GB(默认 512MB)
    }
)

建议

  • 小数据集(< 1000 万):256MB
  • 中等数据集(1000 万 - 1 亿):512MB(默认)
  • 大数据集(> 1 亿):1GB

8.2 分片数优化

# 根据写入并发度设置分片数
collection = Collection(
    name="high_throughput",
    schema=schema,
    shards_num=4  # 4 个分片,支持 4 个并发写入
)

建议

  • 低并发(< 1000 QPS):2 个分片
  • 中等并发(1000-5000 QPS):4 个分片
  • 高并发(> 5000 QPS):8 个分片

8.3 批量操作

# 批量插入(推荐)
batch_size = 1000
for i in range(0, len(data), batch_size):
    batch = data[i:i+batch_size]
    collection.insert(batch)

# 批量删除
pks_to_delete = [1, 2, 3, ..., 1000]
collection.delete(expr=f"id in {pks_to_delete}")

九、总结

核心要点

  1. 数据模型:Database → Collection → Partition → Segment
  2. 列式存储:提高查询效率和压缩率
  3. Binlog:持久化数据,支持故障恢复
  4. 对象存储:低成本、高可靠的存储方案
  5. 标记删除:软删除机制,延迟物理删除

运维优化最佳实践

✅ 合理设置分区,提高查询效率 ✅ 使用批量操作,提升吞吐量 ✅ 定期压缩,清理碎片 ✅ 根据数据规模调整 Segment 大小 ✅ 使用 MMap 降低内存占用

您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区(YunPan.Plus) ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-1 16:22 , Processed in 0.059815 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 CloudStack.

快速回复 返回顶部 返回列表