一、Milvus 数据模型概览
1.1 层次结构
Milvus 的数据组织采用四层结构,类似关系数据库但又有所不同:
Database(数据库)
└── Collection(集合)
└── Partition(分区)
└── Segment(段)
└── 实际数据(向量 + 标量字段)
与关系数据库对比:
| Milvus |
关系数据库 |
说明 |
| Database |
Database |
逻辑隔离单元 |
| Collection |
Table |
数据表 |
| Partition |
Partition |
分区 |
| Segment |
Page/Block |
物理存储单元 |
| Field |
Column |
字段/列 |
| Entity |
Row |
数据行 |
1.2 核心概念
Database(数据库)
Milvus 2.2+ 支持多数据库,实现租户隔离:
from pymilvus import connections, db
# 连接 Milvus
connections.connect(host="localhost", port="19530")
# 创建数据库
db.create_database("my_database")
# 列出所有数据库
databases = db.list_database()
print(databases) # ['default', 'my_database']
# 使用数据库
db.using_database("my_database")
使用场景:
- 多租户隔离
- 开发/测试/生产环境分离
- 不同业务线数据隔离
Collection(集合)
集合是 Milvus 中最重要的概念,类似关系数据库的表:
from pymilvus import Collection, FieldSchema, CollectionSchema, DataType
# 定义 Schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1000),
FieldSchema(name="score", dtype=DataType.FLOAT),
FieldSchema(name="metadata", dtype=DataType.JSON) # 动态字段
]
schema = CollectionSchema(
fields=fields,
description="Document collection",
enable_dynamic_field=True # 支持动态字段
)
# 创建集合
collection = Collection(
name="documents",
schema=schema,
shards_num=2 # 分片数(虚拟通道数)
)
Collection 的关键属性:
- Schema:定义字段类型和约束
- Shards:虚拟通道数,影响写入并发度
- Consistency Level:一致性级别
- TTL:数据过期时间(可选)
Partition(分区)
分区是集合的逻辑划分,用于提高查询效率:
# 创建分区
collection.create_partition("partition_2024")
collection.create_partition("partition_2023")
# 插入数据到指定分区
collection.insert(data, partition_name="partition_2024")
# 查询指定分区
results = collection.search(
data=[query_vector],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=10,
partition_names=["partition_2024"] # 只搜索 2024 年的数据
)
分区策略:
- 按时间分区:适合日志、事件数据
# 按月分区
partition_names = ["2024_01", "2024_02", "2024_03"]
- 按业务分区:适合多业务场景
# 按类别分区
partition_names = ["tech", "finance", "healthcare"]
- 按分区键自动分区(Milvus 2.2+):
# 定义分区键
FieldSchema(
name="user_id",
dtype=DataType.INT64,
is_partition_key=True # 自动按 user_id 分区
)
Segment(段)
Segment 是 Milvus 数据存储的最小单位,类似 LSM-Tree 的 SSTable:
Segment 的生命周期:
Growing Segment(增长段)
↓ 达到阈值(512MB)
Sealed Segment(封闭段)
↓ 构建索引
Indexed Segment(索引段)
↓ 压缩合并
Compacted Segment(压缩段)
Segment 类型:
| 类型 |
状态 |
说明 |
可查询 |
| Growing |
增长中 |
接收新数据 |
✅ |
| Sealed |
已封闭 |
不再接收数据 |
✅ |
| Flushed |
已刷盘 |
持久化到对象存储 |
✅ |
| Indexed |
已索引 |
构建了向量索引 |
✅ |
| Dropped |
已删除 |
等待垃圾回收 |
❌ |
二、数据写入流程
2.1 完整的写入链路
跟踪一条数据从客户端到持久化的完整旅程:
第 1 步:客户端发起插入
┌─────────────┐
│ Client │ insert(data)
└──────┬──────┘
│
第 2 步:Proxy 预处理
┌──────▼──────┐
│ Proxy │ 1. 验证 Schema
└──────┬──────┘ 2. 分配主键(如果 auto_id)
│ 3. 分配时间戳
│ 4. 数据分片(按主键哈希)
│
第 3 步:写入 WAL(消息队列)
┌──────▼──────┐
│ Pulsar/Kafka│ 持久化日志,保证不丢失
└──────┬──────┘
│
├────────────────────┬────────────────────┐
│ │ │
第 4 步:DataNode 消费 第 5 步:QueryNode 消费
┌──────▼──────┐ ┌───▼────┐
│ DataNode │ │QueryNode│
└──────┬──────┘ └───┬────┘
│ │
│ 1. 批量聚合 │ 1. 更新内存索引
│ 2. 构建 Segment │ 2. 实时可查询
│ 3. 持久化 Binlog │
│ │
第 6 步:存储到对象存储
┌──────▼──────┐
│ MinIO/S3 │ 永久存储
└─────────────┘
2.2 详细代码流程
步骤 1:客户端插入
from pymilvus import Collection
collection = Collection("documents")
# 准备数据
data = [
{
"embedding": [0.1] * 768,
"text": "Milvus is a vector database",
"score": 0.95
},
{
"embedding": [0.2] * 768,
"text": "Vector search is fast",
"score": 0.88
}
]
# 插入数据
result = collection.insert(data)
print(f"Inserted {result.insert_count} entities")
print(f"Primary keys: {result.primary_keys}")
步骤 2:Proxy 处理
// Proxy 的插入处理(简化版)
func (p *Proxy) Insert(ctx context.Context, req *InsertRequest) (*InsertResponse, error) {
// 1. 获取集合信息
collectionInfo := p.getCollectionInfo(req.CollectionName)
// 2. 验证 Schema
if err := validateSchema(req.Data, collectionInfo.Schema); err != nil {
return nil, err
}
// 3. 分配主键(如果 auto_id)
if collectionInfo.AutoID {
req.IDs = p.idAllocator.AllocN(len(req.Data))
}
// 4. 分配时间戳
ts, err := p.tsoAllocator.AllocOne()
if err != nil {
return nil, err
}
req.Timestamp = ts
// 5. 数据分片(按主键哈希到不同的虚拟通道)
shardData := p.hashToShards(req.Data, collectionInfo.ShardNum)
// 6. 写入消息队列
for shardID, data := range shardData {
msg := &InsertMsg{
CollectionID: collectionInfo.ID,
PartitionID: req.PartitionID,
ShardID: shardID,
Data: data,
Timestamp: ts,
}
p.msgStream.Produce(shardID, msg)
}
return &InsertResponse{
InsertCount: len(req.Data),
IDs: req.IDs,
}, nil
}
步骤 3:DataNode 消费 WAL
// DataNode 的数据处理
type DataNode struct {
insertBuffer *InsertBuffer // 内存缓冲区
segments map[int64]*Segment
}
func (dn *DataNode) consumeWAL() {
for msg := range dn.msgStream.Chan() {
insertMsg := msg.(*InsertMsg)
// 1. 写入内存缓冲区
dn.insertBuffer.Add(insertMsg)
// 2. 检查是否需要刷盘
if dn.insertBuffer.Size() >= FlushThreshold {
dn.flush()
}
}
}
func (dn *DataNode) flush() {
// 1. 从缓冲区取出数据
data := dn.insertBuffer.Drain()
// 2. 按 Segment 组织数据
segmentData := dn.groupBySegment(data)
for segmentID, data := range segmentData {
// 3. 序列化为 Binlog
binlog := dn.serializeToBinlog(data)
// 4. 上传到对象存储
path := fmt.Sprintf("binlog/%d/%d/%d",
data.CollectionID,
data.PartitionID,
segmentID)
dn.objectStorage.Put(path, binlog)
// 5. 通知 DataCoord
dn.dataCoord.SaveBinlogPaths(&SaveBinlogPathsRequest{
SegmentID: segmentID,
BinlogPaths: []string{path},
NumRows: len(data.Rows),
})
}
}
2.3 Segment 管理
Growing Segment
type GrowingSegment struct {
ID int64
CollectionID int64
PartitionID int64
// 内存中的数据
insertBuffer *InsertBuffer
deleteBuffer *DeleteBuffer
// 统计信息
numRows int64
memorySize int64
// 状态
state SegmentState // Growing, Sealed, Flushed
}
// 插入数据
func (s *GrowingSegment) Insert(data *InsertData) error {
s.insertBuffer.Add(data)
s.numRows += int64(len(data.Rows))
s.memorySize += data.Size()
// 检查是否需要封闭
if s.memorySize >= MaxSegmentSize { // 默认 512MB
s.Seal()
}
return nil
}
// 封闭 Segment
func (s *GrowingSegment) Seal() {
s.state = SegmentState_Sealed
// 通知 DataCoord 进行刷盘
notifyDataCoord(s.ID)
}
Segment 压缩
Milvus 会定期压缩小 Segment,提高查询效率:
// DataCoord 的压缩策略
type CompactionStrategy struct {
// 小 Segment 合并
MinSegmentSize int64 // 小于 128MB 的 Segment
MaxSegmentSize int64 // 合并后不超过 512MB
// 删除数据压缩
DeleteRatio float64 // 删除比例超过 20% 触发压缩
}
func (dc *DataCoord) triggerCompaction() {
// 1. 找到需要压缩的 Segment
segments := dc.findCompactionCandidates()
// 2. 分组(同一个 Partition 的 Segment)
groups := dc.groupSegments(segments)
// 3. 创建压缩任务
for _, group := range groups {
task := &CompactionTask{
Type: MixCompaction, // 或 MergeCompaction
Segments: group,
}
dc.compactionQueue.Push(task)
}
}
三、列式存储与 Binlog
3.1 为什么使用列式存储?
向量数据库的特点:
- 宽表:每行有多个字段(向量 + 标量)
- 列查询:通常只查询部分字段
- 压缩友好:同类型数据压缩率高
行式 vs 列式:
行式存储(传统数据库):
Row1: [id=1, vector=[0.1,0.2,...], text="hello", score=0.9]
Row2: [id=2, vector=[0.3,0.4,...], text="world", score=0.8]
Row3: [id=3, vector=[0.5,0.6,...], text="milvus", score=0.95]
列式存储(Milvus):
Column_id: [1, 2, 3]
Column_vector: [[0.1,0.2,...], [0.3,0.4,...], [0.5,0.6,...]]
Column_text: ["hello", "world", "milvus"]
Column_score: [0.9, 0.8, 0.95]
优势:
- ✅ 只读取需要的列,减少 I/O
- ✅ 同类型数据压缩率高(10-100 倍)
- ✅ SIMD 优化友好
- ✅ 支持列级索引
3.2 Binlog 格式
Milvus 使用自定义的 Binlog 格式存储数据:
Binlog 文件结构:
┌─────────────────────────────────────┐
│ Magic Number (4 bytes) │ 文件标识
├─────────────────────────────────────┤
│ Descriptor (变长) │ 元数据(Schema、压缩算法等)
├─────────────────────────────────────┤
│ Event Header (固定) │ 事件头(时间戳、类型等)
├─────────────────────────────────────┤
│ Event Data (变长) │ 实际数据(压缩后)
├─────────────────────────────────────┤
│ ...更多 Event... │
├─────────────────────────────────────┤
│ Footer (固定) │ 文件尾(校验和等)
└─────────────────────────────────────┘
Binlog 类型:
- Insert Binlog:插入数据
files/insert_log/{collection_id}/{partition_id}/{segment_id}/{field_id}/
- Delete Binlog:删除数据
files/delta_log/{collection_id}/{partition_id}/{segment_id}/
- Stats Binlog:统计信息
files/stats_log/{collection_id}/{partition_id}/{segment_id}/{field_id}/
3.3 数据压缩
Milvus 支持多种压缩算法:
type CompressionType int
const (
NoCompression CompressionType = iota
LZ4 // 快速压缩,压缩率中等
Zstd // 高压缩率,速度较慢
Snappy // 平衡方案
)
// 压缩数据
func compress(data []byte, compressionType CompressionType) []byte {
switch compressionType {
case LZ4:
return lz4.Compress(data)
case Zstd:
return zstd.Compress(data)
case Snappy:
return snappy.Compress(data)
default:
return data
}
}
压缩效果:
| 数据类型 |
原始大小 |
LZ4 |
Zstd |
压缩率 |
| Float 向量 |
3072 字节 |
2800 字节 |
2500 字节 |
1.2x |
| Int64 ID |
8 字节 |
6 字节 |
5 字节 |
1.6x |
| 文本字段 |
1000 字节 |
400 字节 |
300 字节 |
3.3x |
四、对象存储集成
4.1 为什么选择对象存储?
传统存储 vs 对象存储:
| 特性 |
本地磁盘 |
对象存储 |
| 容量 |
有限(TB 级) |
无限(PB 级) |
| 成本 |
高 |
低(1/10) |
| 可靠性 |
需要 RAID |
11 个 9(99.999999999%) |
| 扩展性 |
困难 |
自动扩展 |
| 访问速度 |
快 |
较慢 |
Milvus 的策略:
- 热数据:内存 + SSD(快速访问)
- 冷数据:对象存储(低成本)
4.2 支持的对象存储
# MinIO(开源,本地部署)
minio:
address: localhost:9000
bucketName: milvus-bucket
accessKeyID: minioadmin
secretAccessKey: minioadmin
# AWS S3
s3:
endpoint: s3.amazonaws.com
bucketName: my-milvus-bucket
region: us-west-2
accessKeyID: ${AWS_ACCESS_KEY}
secretAccessKey: ${AWS_SECRET_KEY}
# 阿里云 OSS
oss:
endpoint: oss-cn-hangzhou.aliyuncs.com
bucketName: my-milvus-bucket
accessKeyID: ${OSS_ACCESS_KEY}
secretAccessKey: ${OSS_SECRET_KEY}
4.3 数据布局
对象存储目录结构:
milvus-bucket/
├── files/
│ ├── insert_log/
│ │ └── {collection_id}/
│ │ └── {partition_id}/
│ │ └── {segment_id}/
│ │ ├── {field_id}/
│ │ │ └── {log_id} # Insert Binlog
│ │ └── ...
│ ├── delta_log/
│ │ └── {collection_id}/
│ │ └── {partition_id}/
│ │ └── {segment_id}/
│ │ └── {log_id} # Delete Binlog
│ └── index/
│ └── {collection_id}/
│ └── {partition_id}/
│ └── {segment_id}/
│ └── {field_id}/
│ └── {index_id} # 索引文件
└── meta/
└── ... # 元数据快照
4.4 数据读取优化
预取(Prefetch)
// QueryNode 的数据预取
type DataLoader struct {
cache *LRUCache
prefetcher *Prefetcher
}
func (dl *DataLoader) LoadSegment(segmentID int64) (*Segment, error) {
// 1. 检查缓存
if seg, ok := dl.cache.Get(segmentID); ok {
return seg, nil
}
// 2. 从对象存储加载
binlogPaths := dl.getBinlogPaths(segmentID)
// 3. 并行加载多个字段
var wg sync.WaitGroup
fields := make(map[int64]*FieldData)
for fieldID, path := range binlogPaths {
wg.Add(1)
go func(fid int64, p string) {
defer wg.Done()
data := dl.loadFromStorage(p)
fields[fid] = data
}(fieldID, path)
}
wg.Wait()
// 4. 构建 Segment
segment := NewSegment(segmentID, fields)
// 5. 加入缓存
dl.cache.Put(segmentID, segment)
// 6. 预取相邻 Segment
dl.prefetcher.Prefetch(segmentID + 1)
return segment, nil
}
MMap(内存映射)
Milvus 2.3+ 支持 MMap,降低内存占用:
// 使用 MMap 加载数据
func (qn *QueryNode) loadSegmentWithMMap(segmentID int64) error {
binlogPath := qn.getBinlogPath(segmentID)
// 1. 下载到本地磁盘
localPath := qn.downloadToLocal(binlogPath)
// 2. MMap 映射到虚拟内存
file, err := os.Open(localPath)
if err != nil {
return err
}
data, err := syscall.Mmap(
int(file.Fd()),
0,
fileSize,
syscall.PROT_READ,
syscall.MAP_SHARED,
)
if err != nil {
return err
}
// 3. 按需加载(操作系统自动管理)
segment := NewSegmentFromMMap(segmentID, data)
qn.segments[segmentID] = segment
return nil
}
MMap 优势:
- 内存占用降低 50-70%
- 操作系统自动管理页面换入换出
- 适合超大数据集
五、数据删除与更新
5.1 删除机制
Milvus 使用标记删除(Soft Delete):
# 删除数据
collection.delete(expr="id in [1, 2, 3]")
# 或者按条件删除
collection.delete(expr="score < 0.5")
删除流程:
1. Proxy 接收删除请求
↓
2. 写入 Delete Binlog(Delta Log)
↓
3. QueryNode 更新 Bloom Filter
↓
4. 查询时过滤已删除数据
↓
5. 压缩时物理删除
Delete Binlog 结构:
type DeleteLog struct {
PrimaryKeys []int64 // 被删除的主键
Timestamps []uint64 // 删除时间戳
}
// QueryNode 应用删除
func (qn *QueryNode) applyDelete(deleteLog *DeleteLog) {
for i, pk := range deleteLog.PrimaryKeys {
ts := deleteLog.Timestamps[i]
// 更新 Bloom Filter
qn.bloomFilter.Add(pk)
// 记录删除时间戳
qn.deletedPKs[pk] = ts
}
}
// 查询时过滤
func (qn *QueryNode) search(query *SearchRequest) *SearchResult {
results := qn.vectorSearch(query)
// 过滤已删除的数据
filtered := make([]*SearchResult, 0)
for _, result := range results {
if !qn.isDeleted(result.PK, query.Timestamp) {
filtered = append(filtered, result)
}
}
return filtered
}
5.2 更新机制
Milvus 不支持直接更新,需要先删除再插入:
# 更新数据(先删后插)
def update_entity(collection, pk, new_data):
# 1. 删除旧数据
collection.delete(expr=f"id == {pk}")
# 2. 插入新数据
new_data["id"] = pk
collection.insert([new_data])
# 使用示例
update_entity(
collection,
pk=123,
new_data={
"embedding": [0.5] * 768,
"text": "Updated text",
"score": 0.99
}
)
为什么不支持原地更新?
- 向量索引难以高效更新
- 列式存储不适合随机写
- 删除+插入更简单可靠
六、数据一致性保证
6.1 WAL(Write-Ahead Log)
所有写操作先写 WAL,保证数据不丢失:
写入流程:
1. 数据写入 WAL(Pulsar/Kafka)✅ 持久化
2. 返回成功给客户端
3. 异步刷盘到对象存储
故障恢复:
1. 从 WAL 重放未刷盘的数据
2. 重建内存状态
3. 继续提供服务
6.2 时间戳与可见性
// 每条数据都有时间戳
type Entity struct {
PK int64
Data map[string]interface{}
Timestamp uint64 // 插入时间戳
}
// 查询时指定时间戳
type SearchRequest struct {
Vector []float32
TopK int
Timestamp uint64 // 查询时间戳(保证一致性)
}
// 只返回时间戳 <= 查询时间戳的数据
func (qn *QueryNode) search(req *SearchRequest) []*Entity {
allResults := qn.vectorSearch(req.Vector, req.TopK)
// 过滤未来数据
visible := make([]*Entity, 0)
for _, entity := range allResults {
if entity.Timestamp <= req.Timestamp {
visible = append(visible, entity)
}
}
return visible
}
6.3 一致性级别
from pymilvus import Collection
collection = Collection("documents")
# 强一致性:保证读到最新数据
results = collection.search(
data=[query_vector],
anns_field="embedding",
param={"metric_type": "COSINE"},
limit=10,
consistency_level="Strong" # 强一致性
)
# 最终一致性:更高性能
results = collection.search(
data=[query_vector],
anns_field="embedding",
param={"metric_type": "COSINE"},
limit=10,
consistency_level="Eventually" # 最终一致性
)
# 会话一致性:同一会话内一致
results = collection.search(
data=[query_vector],
anns_field="embedding",
param={"metric_type": "COSINE"},
limit=10,
consistency_level="Session" # 会话一致性
)
七、实战:数据生命周期管理
7.1 数据过期(TTL)
from pymilvus import Collection, CollectionSchema
# 创建带 TTL 的集合
schema = CollectionSchema(
fields=[...],
description="Logs with TTL"
)
collection = Collection(
name="logs",
schema=schema,
properties={
"collection.ttl.seconds": 86400 # 1 天后自动删除
}
)
7.2 数据压缩策略
# 手动触发压缩
collection.compact()
# 等待压缩完成
collection.wait_for_compaction_completed()
# 查看压缩状态
state = collection.get_compaction_state()
print(f"Compaction state: {state}")
7.3 数据备份
# 导出数据
from pymilvus import utility
# 创建备份
utility.create_alias(
collection_name="documents",
alias="documents_backup_20241125"
)
# 导出到文件
collection.query(
expr="", # 导出所有数据
output_fields=["*"]
)
八、性能优化建议
8.1 Segment 大小优化
# 调整 Segment 大小
collection = Collection(
name="large_collection",
schema=schema,
properties={
"segment.maxSize": 1024, # 1GB(默认 512MB)
}
)
建议:
- 小数据集(< 1000 万):256MB
- 中等数据集(1000 万 - 1 亿):512MB(默认)
- 大数据集(> 1 亿):1GB
8.2 分片数优化
# 根据写入并发度设置分片数
collection = Collection(
name="high_throughput",
schema=schema,
shards_num=4 # 4 个分片,支持 4 个并发写入
)
建议:
- 低并发(< 1000 QPS):2 个分片
- 中等并发(1000-5000 QPS):4 个分片
- 高并发(> 5000 QPS):8 个分片
8.3 批量操作
# 批量插入(推荐)
batch_size = 1000
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
collection.insert(batch)
# 批量删除
pks_to_delete = [1, 2, 3, ..., 1000]
collection.delete(expr=f"id in {pks_to_delete}")
九、总结
核心要点
- 数据模型:Database → Collection → Partition → Segment
- 列式存储:提高查询效率和压缩率
- Binlog:持久化数据,支持故障恢复
- 对象存储:低成本、高可靠的存储方案
- 标记删除:软删除机制,延迟物理删除
✅ 合理设置分区,提高查询效率
✅ 使用批量操作,提升吞吐量
✅ 定期压缩,清理碎片
✅ 根据数据规模调整 Segment 大小
✅ 使用 MMap 降低内存占用