找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2006

积分

0

好友

277

主题
发表于 2025-12-30 17:43:26 | 查看: 20| 回复: 0

告别传统关键词匹配,用语义搜索解锁法律文档的深层含义

图:告别传统关键词匹配,用语义搜索解锁法律文档的深层含义

你是否曾为在海量文件(以法律文件为例)中寻找相关判例而头疼?传统的关键词搜索经常错过关键信息——毕竟法律文本的微妙之处往往不在于具体词汇,而在于概念之间的关系

今天,我们就来聊聊如何用 Python 构建一个闪电般快速的法律文档语义搜索系统。我将带你从零开始,基于澳大利亚高等法院的14.3万份判例文档,构建一个能从毫秒级响应数千并发查询的智能检索系统。

为什么需要向量搜索?

法律文本有其独特之处:

  • 高度专业化:术语密集,语境依赖性强
  • 长文档结构:单个判例可能长达数十页
  • 概念关联复杂:相似的判决理由可能用完全不同的词汇表达

传统的TF-IDF或BM25算法在这里显得力不从心。而向量搜索通过将文本转换为高维空间中的点,让相似的概念彼此靠近,从而实现了“理解语义”的检索。

但问题是:如何在保证检索质量的同时,处理如此大规模的数据?

第一步:选择嵌入模型,先看服务条款!

在开始编码之前,有一个关键步骤大多数开发者都会忽略:仔细阅读嵌入API的服务条款

当我用Claude帮我分析了几大主流嵌入服务提供商的服务条款后,发现了令人惊讶的差异:

提供商 训练数据使用 数据保留 基准测试 免费用户保护
Isaacus 仅当明确提供反馈时 保护所有用户 允许
Google 是(付费用户也会训练) 可能共享 有限制
OpenAI 是(可手动选择退出) 30天删除 允许
Voyage AI 是(可手动选择退出) 未明确 允许

重要发现:如果你处理的是敏感的法律或医疗文档,有些提供商默认会使用你的数据进行模型训练,甚至可能将数据共享或出售给第三方。

这就是为什么我最终选择了Isaacus——不仅因为它的服务条款对用户数据保护最为友好,而且它的kanon-2-embedder模型在后续测试中展现出了出色的性能。

为什么不直接用本地模型?

你可能会问:“为什么不直接在本地运行嵌入模型?” 这确实是一个选项,特别是对于敏感数据。

本地模型的优势:

  • 完全的数据控制
  • 无需担心API限速或价格变动
  • 可以针对特定领域进行微调

但权衡点是质量和便利性:

  • 最好的嵌入模型通常是专有模型,只能通过API访问
  • 本地推理需要GPU资源和管理成本

对于这个项目,我同时测试了API方案和本地方案。本地模型使用的是我在澳大利亚法律语料上微调的BGE-small模型,虽然速度快,但384维的嵌入空间在语义丰富度上无法与API模型的1792-3072维相媲美

第二步:高效获取嵌入向量

异步批处理:速度提升3-5倍

处理14.3万个文本块,如果顺序处理将花费数小时。我们需要异步处理,但要谨慎避免触发API限流:

import asyncio
import numpy as np
from isaacus import AsyncClient
import os

# 设置并发批处理数限制
max_concurrent_batches = 5
semaphore = asyncio.Semaphore(max_concurrent_batches)

async def process_batch(batch_texts, client, model_name, task_type):
    """处理单批文本的嵌入生成"""
    async with semaphore:  # 限流控制
        response = await client.embed(
            model=model_name,
            inputs=batch_texts,
            task=task_type
        )
    return np.array(response.embeddings, dtype=np.float32)

async def generate_embeddings_batch(corpus_texts, queries):
    """批量生成文档和查询的嵌入向量"""

    # 初始化客户端
    client = AsyncClient(api_key=os.getenv("ISAACUS_API_KEY"))

    # 准备批处理
    batch_size = 100  # 根据API限制调整
    corpus_batches = [corpus_texts[i:i+batch_size] 
                      for i in range(0, len(corpus_texts), batch_size)]

    # 并行处理文档嵌入
    corpus_tasks = []
    for batch in corpus_batches:
        task = process_batch(batch, client, "kanon-2-embedder", "retrieval/document")
        corpus_tasks.append(task)

    corpus_results = await asyncio.gather(*corpus_tasks)
    corpus_embeddings = np.vstack(corpus_results)

    # 生成查询嵌入
    query_response = await client.embed(
        model="kanon-2-embedder",
        inputs=queries,
        task="retrieval/query"
    )
    query_embeddings = np.array(query_response.embeddings, dtype=np.float32)

    # 保存到本地
    os.makedirs("embeddings", exist_ok=True)
    np.save("embeddings/corpus_embeddings.npy", corpus_embeddings)
    np.save("embeddings/query_embeddings.npy", query_embeddings)

    await client.close()
    return corpus_embeddings, query_embeddings

# 使用示例
corpus_texts = ["The High Court of Australia held that...", ...]  # 14.3万份文档
queries = ["What is the doctrine of precedent?", "How is negligence established?"]

# 运行异步函数
corpus_emb, query_emb = asyncio.run(generate_embeddings_batch(corpus_texts, queries))
print(f"生成了 {len(corpus_emb)} 个文档嵌入,维度: {corpus_emb.shape[1]}")

性能实测:不同提供商的对比

经过优化后,不同嵌入提供商的性能表现:

批处理速度(处理1000个法律文档):

  • 本地模型(auslaw-embed,384d):924 文本/秒(32GB GPU)
  • OpenAI(text-embedding-3-large,3072d):184 文本/秒
  • Isaacus(kanon-2-embedder,1792d):102 文本/秒
  • Google(gemini-embedding-001,3072d):19.8 文本/秒
  • Voyage AI(voyage-3-large,2048d):14 文本/秒

单查询延迟(用户体验的关键指标):

  • 本地模型:7ms 平均,15ms P95
  • Google:501.1ms 平均,662.3ms P95
  • OpenAI:1,114ms 平均,1,723ms P95
  • Isaacus:1,532ms 平均,2,097ms P95
  • Voyage AI:1,693ms 平均,7,657ms P95

注:P95表示95%的请求比这个时间快,是衡量用户体验的更好指标

第三步:256维度的神奇优化

Isaacus的kanon-2-embedder有一个独特特性:前几个维度携带了大部分语义信息。这让我们可以进行大幅度的维度裁剪:

import numpy as np

# 加载完整的嵌入向量
corpus_embeddings = np.load("embeddings/corpus_embeddings.npy")

# 仅使用前256个维度(从1792维裁剪)
corpus_256d = corpus_embeddings[:, :256].astype(np.float32)

print(f"原始维度: {corpus_embeddings.shape[1]}")
print(f"裁剪后维度: {corpus_256d.shape[1]}")
print(f"内存占用减少: {(1 - 256/1792) * 100:.1f}%")

优化效果惊人:

  • 🔥 搜索速度提升8.6倍:从53 q/s到459 q/s
  • 💾 内存占用减少7倍:从1,028 MB到140 MB
  • 📊 检索质量保留61%:recall@10指标

重要说明:这里的61% recall@10是相对于1792维全量检索的基准。在实际的RAG(检索增强生成)应用中,这通常足够了——因为检索只是第一步,后续还有重排序和生成步骤。

第四步:USearch实现CPU上的极速检索

现在进入最精彩的部分:如何在不使用GPU的情况下实现毫秒级检索

大多数向量搜索教程会推荐FAISS或Pinecone,但我发现了一个宝藏库:USearch。它通过SIMD优化(现代CPU的并行指令集)在纯CPU上实现了惊人的速度。

安装USearch

pip install usearch numpy

1. 基础版:多线程精确搜索

from usearch.index import search, MetricKind

# 使用8个线程进行批量搜索
matches = search(
    corpus_256d,         # 文档嵌入向量
    query_embeddings,    # 查询嵌入向量
    100,                 # 返回前100个结果
    MetricKind.Cos,      # 使用余弦相似度
    exact=True,          # 精确搜索
    threads=8            # 多线程并行
)

# 结果:374 q/s,相比单线程提升7倍

2. 进阶版:HNSW索引加速

对于查询频率高于更新的场景,构建索引是值得的:

from usearch.index import Index
import time

# 创建HNSW索引
index = Index(
    ndim=256,            # 向量维度
    metric=MetricKind.Cos, # 相似度度量
    connectivity=32,     # 连接数(越高质量越好,内存越大)
    expansion_add=200,   # 构建时的扩展数
    expansion_search=100 # 搜索时的扩展数
)

print("开始构建索引...")
start_time = time.time()

# 批量添加文档
for i, embedding in enumerate(corpus_256d):
    index.add(i, embedding)

    # 进度显示
    if (i + 1) % 10000 == 0:
        print(f"已索引 {i+1}/{len(corpus_256d)} 个文档")

build_time = time.time() - start_time
print(f"索引构建完成,耗时: {build_time:.1f}秒")

# 保存索引供后续使用
index.save("legal_search_index.usearch")

3. 终极优化:全栈配置

# 准备半精度(16位)的256维向量
corpus_256d_half = corpus_embeddings[:, :256].astype(np.float16)

index = Index(
    ndim=256,
    metric=MetricKind.Cos,
    dtype="f16",           # 半精度,节省50%内存
    connectivity=32,
    expansion_add=200,
    expansion_search=100
)

# 快速构建(143K文档仅需59秒)
for i, emb in enumerate(corpus_256d_half):
    index.add(i, emb)

# 查询速度:2,880 q/s!

性能对比:优化前后的惊人差异

优化级别 查询速度 内存占用 Recall@10 适用场景
基准线 53 q/s 1,028 MB 100% 最高精度需求
多线程 374 q/s 1,028 MB 100% 中等规模,需要精确结果
HNSW索引 993 q/s 415 MB 98.6% 生产环境,良好平衡
全栈优化 2,880 q/s 70 MB 61% 大规模RAG,快速初筛

并发处理能力对比

基准系统(53 q/s):

  • 1个用户:19ms响应(良好)
  • 100个并发用户:1.9秒响应(勉强接受)
  • 1,000个并发用户:19秒响应(不可接受)

优化系统(2,880 q/s):

  • 1个用户:0.35ms响应(极快)
  • 100个并发用户:35ms响应(优秀)
  • 1,000个并发用户:347ms响应(良好)
  • 10,000个并发用户:3.5秒响应(可接受)

完整生产级代码实现

下面是一个完整的、可投入生产的法律文档搜索系统:

import numpy as np
from usearch.index import Index, search, MetricKind
from pathlib import Path
from typing import Optional, Union, List, Tuple
import time
import json

class LegalDocumentSearcher:
    """法律文档语义搜索引擎"""

    def __init__(
        self,
        corpus_embeddings: np.ndarray,
        optimization_level: str = "balanced",
        index_path: Optional[str] = None
    ):
        """
        初始化搜索引擎

        参数:
            corpus_embeddings: 文档嵌入向量,形状为 (n_docs, n_dims)
            optimization_level: 优化级别 - "accuracy"|"balanced"|"speed"
            index_path: 索引保存路径(可选)
        """
        self.corpus_embeddings = corpus_embeddings
        self.optimization_level = optimization_level
        self.index_path = index_path

        # 根据优化级别配置参数
        self._configure_optimization()

        # 准备文档向量
        self._prepare_corpus()

        # 构建或加载索引
        self.index = self._setup_index()

    def _configure_optimization(self):
        """根据优化级别配置参数"""
        configs = {
            "speed": {
                "dimensions": 256,
                "use_index": True,
                "dtype": "f16",
                "connectivity": 32,
                "expansion_add": 200,
                "expansion_search": 100
            },
            "balanced": {
                "dimensions": None,  # 使用所有维度
                "use_index": True,
                "dtype": "f32",
                "connectivity": 32,
                "expansion_add": 200,
                "expansion_search": 100
            },
            "accuracy": {
                "dimensions": None,
                "use_index": False,
                "dtype": "f32",
                "connectivity": None,
                "expansion_add": None,
                "expansion_search": None
            }
        }

        if self.optimization_level not in configs:
            raise ValueError(f"优化级别必须是: {list(configs.keys())}")

        config = configs[self.optimization_level]
        for key, value in config.items():
            setattr(self, key, value)

    def _prepare_corpus(self):
        """预处理文档嵌入向量"""
        if self.dimensions:
            # 裁剪维度
            self.corpus_processed = self.corpus_embeddings[:, :self.dimensions]
            print(f"维度裁剪: {self.corpus_embeddings.shape[1]} -> {self.dimensions}")
        else:
            self.corpus_processed = self.corpus_embeddings

        # 转换数据类型
        if self.dtype == "f16":
            self.corpus_processed = self.corpus_processed.astype(np.float16)
        else:
            self.corpus_processed = self.corpus_processed.astype(np.float32)

        # 确保内存连续(SIMD优化需要)
        self.corpus_processed = np.ascontiguousarray(self.corpus_processed)

    def _setup_index(self):
        """设置索引(构建或加载)"""
        if not self.use_index:
            return None

        # 如果有保存的索引,直接加载
        if self.index_path and Path(self.index_path).exists():
            print(f"加载索引: {self.index_path}")
            index = Index.restore(self.index_path)

            # 加载元数据
            meta_path = f"{self.index_path}.meta.json"
            if Path(meta_path).exists():
                with open(meta_path, 'r') as f:
                    self.metadata = json.load(f)

            return index

        # 否则构建新索引
        print("构建HNSW索引...")
        start_time = time.time()

        ndim = self.dimensions or self.corpus_embeddings.shape[1]
        index = Index(
            ndim=ndim,
            metric=MetricKind.Cos,
            dtype=self.dtype,
            connectivity=self.connectivity,
            expansion_add=self.expansion_add,
            expansion_search=self.expansion_search
        )

        # 添加文档向量
        n_docs = len(self.corpus_processed)
        for i, embedding in enumerate(self.corpus_processed):
            index.add(i, embedding)
            if (i + 1) % 10000 == 0:
                print(f"进度: {i+1}/{n_docs}")

        build_time = time.time() - start_time
        print(f"索引构建完成,耗时: {build_time:.1f}秒")

        # 保存索引
        if self.index_path:
            index.save(self.index_path)
            self._save_metadata()

        return index

    def _save_metadata(self):
        """保存索引元数据"""
        metadata = {
            "optimization_level": self.optimization_level,
            "dimensions": self.dimensions,
            "dtype": self.dtype,
            "corpus_size": len(self.corpus_processed),
            "embedding_dim": self.corpus_embeddings.shape[1],
            "build_time": time.strftime("%Y-%m-%d %H:%M:%S")
        }

        if self.index_path:
            meta_path = f"{self.index_path}.meta.json"
            with open(meta_path, 'w') as f:
                json.dump(metadata, f, indent=2)

    def search(
        self,
        query_embedding: np.ndarray,
        k: int = 10,
        include_scores: bool = False
    ) -> Union[List[int], Tuple[List[int], List[float]]]:
        """
        搜索相似文档

        参数:
            query_embedding: 查询嵌入向量
            k: 返回的结果数量
            include_scores: 是否包含相似度分数

        返回:
            文档索引列表(或包含分数的元组)
        """
        # 确保查询向量维度匹配
        if self.dimensions:
            query_processed = query_embedding[:self.dimensions]
        else:
            query_processed = query_embedding

        query_processed = query_processed.astype(np.float32)

        if self.use_index and self.index:
            # 使用HNSW索引搜索
            matches = self.index.search(query_processed, k)

            if include_scores:
                # 余弦相似度转换为距离
                scores = [1 - dist for dist in matches.distances]
                return matches.keys.tolist(), scores
            return matches.keys.tolist()
        else:
            # 精确搜索(多线程)
            matches = search(
                self.corpus_processed,
                query_processed.reshape(1, -1),
                k,
                MetricKind.Cos,
                exact=True,
                threads=8
            )

            if include_scores:
                scores = [1 - dist for dist in matches.distances[0]]
                return matches.keys[0].tolist(), scores
            return matches.keys[0].tolist()

    def batch_search(
        self,
        query_embeddings: np.ndarray,
        k: int = 10
    ) -> List[List[int]]:
        """批量搜索"""
        results = []
        for query in query_embeddings:
            doc_indices = self.search(query, k)
            results.append(doc_indices)
        return results

# 使用示例
def main():
    # 加载嵌入向量
    print("加载文档嵌入向量...")
    corpus_embeddings = np.load("embeddings/corpus_embeddings.npy")

    # 创建搜索器(使用平衡模式)
    print("初始化搜索引擎...")
    searcher = LegalDocumentSearcher(
        corpus_embeddings=corpus_embeddings,
        optimization_level="balanced",  # 速度与质量的平衡
        index_path="indices/legal_search_balanced.usearch"
    )

    # 示例查询
    from isaacus import AsyncClient
    import asyncio

    async def test_search():
        client = AsyncClient(api_key=os.getenv("ISAACUS_API_KEY"))

        # 测试查询
        test_queries = [
            "What constitutes negligence in Australian law?",
            "How is the doctrine of precedent applied?",
            "What are the elements of a valid contract?"
        ]

        # 生成查询嵌入
        response = await client.embed(
            model="kanon-2-embedder",
            inputs=test_queries,
            task="retrieval/query"
        )
        query_embeddings = np.array(response.embeddings, dtype=np.float32)

        await client.close()

        # 执行搜索
        for i, query in enumerate(test_queries):
            print(f"\n查询: '{query}'")
            start_time = time.time()
            results, scores = searcher.search(query_embeddings[i], k=5, include_scores=True)
            search_time = (time.time() - start_time) * 1000

            print(f"搜索耗时: {search_time:.2f}ms")
            print("最相关文档:")
            for j, (doc_idx, score) in enumerate(zip(results, scores)):
                print(f"  {j+1}. 文档 #{doc_idx} (相似度: {score:.3f})")

    # 运行测试
    asyncio.run(test_search())

if __name__ == "__main__":
    main()

如何选择适合你的配置?

准确度优先模式(optimization_level="accuracy")

适用场景:

  • 法律合规要求100%召回率
  • 文档数量较少(<10万)
  • 研究性应用,每个结果都至关重要

配置:

  • 使用完整维度(1792维)
  • 精确搜索(无索引)
  • 多线程并行

平衡模式(optimization_level="balanced")

适用场景:

  • 生产环境法律搜索工具
  • 需要接近完美的结果(>95%召回率)
  • 能接受几秒钟的索引构建时间

配置:

  • 使用完整维度
  • HNSW索引(连接数32)
  • 推荐用于大多数法律应用

速度优先模式(optimization_level="speed")

适用场景:

  • 面向消费者的应用
  • 内存受限环境(<200MB)
  • RAG系统中的第一轮快速检索
  • 需要处理数千并发用户

配置:

  • 256维裁剪
  • 半精度存储(f16)
  • HNSW索引
  • 最高查询速度

写在最后

  1. 服务条款很重要:处理敏感数据前,务必阅读嵌入API的服务条款
  2. 维度裁剪很有效:Isaacus嵌入的前256维保留了大部分语义信息
  3. CPU也能很快:USearch通过SIMD优化在CPU上实现毫秒级检索
  4. 权衡是必要的:在速度、内存和准确度之间找到适合你用例的平衡点

对于法律搜索应用,我推荐平衡模式——993 q/s的速度加上98.6%的召回率,既能提供优秀的用户体验,又能保证检索质量。

但如果你在构建RAG系统,需要快速筛选出前50-100个相关文档供后续处理,那么速度优先模式的2,880 q/s吞吐量将带来质的飞跃。

记住:一个能在0.35ms内返回61%相关结果的系统,往往比需要19ms才能返回100%结果的系统更有用——特别是在多级处理管道中。

现在,你已经拥有了构建高性能法律文档搜索系统所需的所有工具和知识。从选择合适的嵌入提供商开始,到实施维度裁剪和USearch优化,每一步都可以根据你的具体需求进行调整。

参考资料

[1] USearch GitHub仓库: https://github.com/unum-cloud/USearch

[2] Isaacus文档: https://docs.isaacus.com/

[3] Open Australian Legal Corpus: https://github.com/justinpombrio/open-australian-legal-corpus

[4] MLEB法律检索基准: https://isaacus.com/mleb

想获取更多关于 人工智能 和数据处理的技术实战与深度思考,欢迎持续关注 云栈社区




上一篇:Python JSON数据处理全攻略:35个实战技巧覆盖全场景
下一篇:TPU核心团队被英伟达锁定,200亿美元剑指AI推理市场
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 09:07 , Processed in 0.366533 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表