找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3925

积分

0

好友

539

主题
发表于 昨天 17:47 | 查看: 8| 回复: 0

本文记录了在 agent-io 项目中,将 AI Agent 的记忆存储方案从传统的 SQLite+FTS5 升级为 LanceDB 向量数据库的完整过程。你将看到详细的代码实现、API 设计以及我们遇到的实际问题和解决方案。

背景

在开发 AI Agent 项目时,我们需要一个高效的记忆存储系统来保存对话历史、用户偏好和知识库。最初选择了 SQLite + FTS5 的组合,原因很简单:

  • SQLite 轻量、成熟、易于部署
  • FTS5 提供了不错的全文搜索能力

但随着 AI Agent 功能的扩展,尤其是对语义理解需求的增长,我们发现这个传统方案遇到了明显的瓶颈:

需求 SQLite+FTS5 LanceDB
关键词搜索 ✅ 支持 ✅ 支持
语义搜索 ❌ 需要额外向量库 ✅ 原生支持
相似度排序 ❌ 不支持 ✅ 支持
Schema 变更 😐 需要迁移 ✅ 灵活
部署复杂度 低(单二进制)

于是,我们决定将存储后端迁移到 LanceDB —— 一个专为 AI/ML 场景设计的嵌入式向量数据库。

《Rust AI Agent Development》书籍封面

为什么选择 LanceDB?

1. 向量原生设计

LanceDB 从底层开始就为向量搜索优化,使用 Apache Arrow 作为数据格式,支持高效的列式存储和向量化计算:

// 向量列定义
Field::new(
    "embedding",
    DataType::FixedSizeList(
        Arc::new(Field::new("item", DataType::Float32, true)),
        1536,  // OpenAI embedding 维度
    ),
    true,
)

2. 无服务器架构

与 Pinecone、Weaviate 等需要独立部署的服务不同,LanceDB 是嵌入式数据库:

  • 无需额外的服务进程
  • 数据存储为本地文件
  • 零运维成本

3. 统一的存储接口

一个数据库同时支持:

  • 结构化数据(元数据、时间戳等)
  • 向量数据(embeddings)
  • 全文搜索

这意味着我们不再需要维护多个分散的存储系统,简化了整体架构。

技术实现

依赖配置

首先,需要在 Cargo.toml 中添加相关依赖,并确保 arrow 系列包的版本一致以避免兼容性问题。

# Cargo.toml
[dependencies]
lancedb = "0.26"
arrow = "57"
arrow-array = "57"
arrow-schema = "57"

Schema 设计

为了满足 AI Agent 记忆存储的需求,我们设计了包含多种数据类型的 Schema:

fn schema() -> Arc<Schema> {
    Arc::new(Schema::new(vec![
        Field::new("id", DataType::Utf8, false),
        Field::new("content", DataType::Utf8, false),
        Field::new("embedding", DataType::FixedSizeList(
            Arc::new(Field::new("item", DataType::Float32, true)),
            1536,
        ), true),
        Field::new("memory_type", DataType::Utf8, false),
        Field::new("metadata", DataType::Utf8, true),
        Field::new("created_at", DataType::Int64, false),
        Field::new("last_accessed", DataType::Int64, true),
        Field::new("importance", DataType::Float32, false),
        Field::new("access_count", DataType::UInt32, false),
    ]))
}

初始化连接

LanceDB 支持内存模式和文件持久化两种模式,根据使用场景灵活选择。

pub struct LanceDbStore {
    table: Arc<Table>,
}

impl LanceDbStore {
    // 内存模式 - 适合测试和临时存储
    pub async fn new() -> Result<Self> {
        Self::open_uri("memory://agent_io_memories").await
    }

    // 文件模式 - 适合生产环境持久化
    pub async fn open<P: Into<PathBuf>>(path: P) -> Result<Self> {
        let path = path.into();
        if let Some(parent) = path.parent() {
            std::fs::create_dir_all(parent)?;
        }
        Self::open_uri(&path.to_string_lossy()).await
    }

    async fn open_uri(uri: &str) -> Result<Self> {
        let db = connect(uri).execute().await?;

        let table = if db.table_names().execute().await?
            .contains(&"memories".to_string())
        {
            db.open_table("memories").execute().await?
        } else {
            db.create_empty_table("memories", Self::schema())
                .execute().await?
        };

        Ok(Self { table: Arc::new(table) })
    }
}

数据写入

LanceDB 使用 Arrow RecordBatch 作为数据格式,需要将我们的结构体数据转换为列式存储。这对习惯于操作行数据(如 SQL)的开发者来说,是个需要适应的点。

async fn add(&self, entry: MemoryEntry) -> Result<String> {
    let id = entry.id.clone();
    let schema = Self::schema();

    // 构建各个列
    let id_array = StringArray::from(vec![entry.id]);
    let content_array = StringArray::from(vec![entry.content]);

    // 向量列需要特殊处理
    let embedding_array = if let Some(ref embedding) = entry.embedding {
        FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
            vec![Some(embedding.iter().map(|&v| Some(v)).collect())],
            1536,
        )
    } else {
        FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
            vec![None],
            1536,
        )
    };

    // 组装 RecordBatch
    let batch = RecordBatch::try_new(schema, vec![
        Arc::new(id_array),
        Arc::new(content_array),
        Arc::new(embedding_array),
        // ... 其他列
    ])?;

    // 写入数据
    self.table
        .add(RecordBatchIterator::new(
            vec![Ok(batch.clone())],
            batch.schema()
        ))
        .execute()
        .await?;

    Ok(id)
}

全文搜索

虽然迁移到了向量数据库,但传统的关键词搜索在特定场景下依然有其价值,比如精确匹配名称或ID。

async fn search(&self, query: &str, limit: usize) -> Result<Vec<MemoryEntry>> {
    let batches = self.table
        .query()
        .only_if(format!("content LIKE '%{}%'", query))
        .limit(limit)
        .execute()
        .await?
        .try_collect::<Vec<_>>()
        .await?;

    // 解析结果...
    Ok(parse_batches(batches))
}

语义搜索(核心亮点)

这是我们迁移到 LanceDB 的主要收益 —— 获得原生的、基于向量相似度的语义搜索能力,这极大地增强了 AI Agent 理解用户意图的能力。

async fn search_by_embedding(
    &self,
    embedding: &[f32],
    limit: usize,
    threshold: f32,
) -> Result<Vec<MemoryEntry>> {
    let batches = self.table
        .query()
        .nearest_to(embedding)  // 核心API:向量搜索
        .limit(limit * 2)
        .execute()
        .await?
        .try_collect::<Vec<_>>()
        .await?;

    let mut results: Vec<(MemoryEntry, f32)> = Vec::new();

    for batch in batches {
        for i in 0..batch.num_rows() {
            let entry = parse_batch_row(&batch, i)?;

            // LanceDB 返回 _distance 列表示距离
            let similarity = batch.column_by_name("_distance")
                .and_then(|col| col.as_any().downcast_ref::<Float32Array>())
                .map(|arr| 1.0 - arr.value(i))  // 距离转相似度
                .unwrap_or(0.0);

            if similarity >= threshold {
                results.push((entry, similarity));
            }
        }
    }

    // 按相似度排序
    results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Equal));
    results.truncate(limit);

    Ok(results.into_iter().map(|(e, _)| e).collect())
}

数据读取

从 Arrow RecordBatch 中提取数据并转换回我们的业务结构体,是使用过程中另一个关键步骤。

fn parse_batch_row(batch: &RecordBatch, i: usize) -> Result<MemoryEntry> {
    // 普通字符串列
    let id = batch.column(0)
        .as_any()
        .downcast_ref::<StringArray>()
        .map(|arr| arr.value(i).to_string())
        .unwrap_or_default();

    // 向量列需要特殊处理
    let embedding = batch.column(2)
        .as_any()
        .downcast_ref::<FixedSizeListArray>()
        .and_then(|arr| {
            if arr.is_null(i) { return None; }
            arr.value(i)
                .as_any()
                .downcast_ref::<Float32Array>()
                .map(|v| v.values().to_vec())
        });

    // ... 其他字段解析
    Ok(MemoryEntry { id, embedding, ... })
}

踩坑记录

任何技术迁移都不会一帆风顺,以下是我们在集成 LanceDB 时遇到的主要问题。

1. Arrow 版本兼容性

问题:不同版本的 arrow crate 之间存在类型不兼容,导致编译错误或运行时问题。
解决:确保所有 arrow 相关依赖使用完全相同的版本号。

arrow = "57"
arrow-array = "57"
arrow-schema = "57"

2. FixedSizeList 的 null 处理

问题:当 embedding 字段为 None 时,需要创建正确类型的 null 数组,编译器类型推断有时会失败。
解决:使用显式的类型标注来帮助编译器。

FixedSizeListArray::from_iter_primitive::<Float32Type, Option<Option<f32>>, _>(
    vec![None],
    1536,
)

3. RecordBatchIterator 包装

问题add 方法要求传入实现了 RecordBatchReader trait 的类型,不能直接传入单个 RecordBatch
解决:使用 RecordBatchIterator 对数据进行包装。

.add(RecordBatchIterator::new(
    vec![Ok(batch.clone())],
    batch.schema()
))

总结

从 SQLite+FTS5 迁移到 LanceDB,我们为 AI Agent 项目带来了实质性的提升:

  1. 语义搜索能力:AI Agent 现在可以理解用户查询的意图,而不仅仅是进行关键词匹配,这尤其对 向量数据 的检索至关重要。
  2. 简化架构:不再需要维护独立的向量数据库服务,所有存储需求在一个嵌入式库中得到满足。
  3. 统一 API:通过一套接口处理所有类型的查询(关键词、语义、混合查询),代码更清晰。
  4. 保持轻量:依然是嵌入式数据库,零外部依赖和运维成本,非常适合从原型到生产的不同阶段。

如果你的 AI 应用场景涉及 RAG(检索增强生成)、语义搜索或向量存储,并且希望保持架构简单,那么 LanceDB 是一个非常值得考虑的“刚刚好”的解决方案。它平衡了能力与复杂度,让开发者能更专注于业务逻辑本身。

如果你也在探索类似的存储方案或对 Rust 开发 AI 应用感兴趣,欢迎到 云栈社区 交流讨论。




上一篇:CSS布局一行代码实战:10种现代网页布局方案解析
下一篇:软考高级架构师论文指南:三步精准回应子题目,拒绝跑题
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-10 09:15 , Processed in 0.576027 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表