找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2088

积分

0

好友

269

主题
发表于 3 小时前 | 查看: 0| 回复: 0

本文深入剖析 QAnything 开源项目的后端服务,包括核心类、接口、逻辑实现,揭示其如何实现高效的检索增强生成(RAG系统,从文档处理到智能问答的完整技术链路。

核心数据结构

一句话职责 关键动作 输出
LocalFile 「把一份原始文件变成可检索的向量块」 解析 → 分块 → OCR(如有)→ 向量化 → 打包成 (doc, emb) 列表 一堆 Document + 对应 embedding
LocalDocQA 「拿这些向量块去回答用户问题」 召回 → 重排序 → 截断 → 拼 Prompt → 调 LLM → 流式返回答案 生成器,每次 yield 一段答案 + 引用的原文

LocalDocQA类

LocalDocQA 是 QAnything 系统的核心引擎,负责协调整个 RAG 流程。该类采用模块化设计,将文档处理、向量存储、语义检索、重排序和 LLM 生成等关键功能有机整合。

核心架构:

class LocalDocQA:
    def __init__(self):
        self.llm: object = None                    # LLM 引擎
        self.embeddings: object = None             # 向量化模型
        self.top_k: int = VECTOR_SEARCH_TOP_K      # 检索数量
        self.chunk_size: int = CHUNK_SIZE          # 文本切片大小
        self.score_threshold: int = VECTOR_SEARCH_SCORE_THRESHOLD  # 相似度阈值
        self.milvus_kbs: List[MilvusClient] = []   # Milvus 客户端池
        self.milvus_summary: KnowledgeBaseManager = None  # 知识库管理
        self.mode: str = None                      # 运行模式
        self.local_rerank_service_url = "http://0.0.0.0:8776"  # 重排序服务
        self.ocr_url = 'http://0.0.0.0:8010/ocr'   # OCR 服务

LLM 引擎集成与向量存储

def create_milvus_collection(self, user_id, kb_id, kb_name):
    milvus_kb = MilvusClient(self.mode, user_id, [kb_id])
    self.milvus_kbs.append(milvus_kb)
    self.milvus_summary.new_milvus_base(kb_id, user_id, kb_name)

def match_milvus_kb(self, user_id, kb_ids):
    # 智能匹配已存在的 Milvus 客户端
    for kb in self.milvus_kbs:
        if user_id == kb.user_id and kb_ids == kb.kb_ids:
            return kb

    # 不存在则创建新的客户端
    milvus_kb = MilvusClient(self.mode, user_id, kb_ids)
    self.milvus_kbs.append(milvus_kb)
    return milvus_kb

文档处理与向量化

async def insert_files_to_milvus(self, user_id, kb_id, local_files: List[LocalFile]):
    milvus_kv = self.match_milvus_kb(user_id, [kb_id])
    success_list = []
    failed_list = []
    for local_file in local_files:
        try:
            # Step 1: 文档解析与切片
            local_file.split_file_to_docs(self.get_ocr_result)
            content_length = sum([len(doc.page_content) for doc in local_file.docs])

            # Step 2: 向量化处理
            local_file.create_embedding()

            # Step 3: 向量存储
            ret = await milvus_kv.insert_files(
                local_file.file_id, 
                local_file.file_name, 
                local_file.file_path,
                local_file.docs, 
                local_file.embs
            )

            if ret:
                success_list.append(local_file)
                self.milvus_summary.update_file_status(local_file.file_id, status='green')
            else:
                failed_list.append(local_file)
                self.milvus_summary.update_file_status(local_file.file_id, status='yellow')

        except Exception as e:
            # 详细的错误处理和状态更新
            self.milvus_summary.update_file_status(local_file.file_id, status='red')
            failed_list.append(local_file)

智能检索与重排序

def get_source_documents(self, queries, milvus_kb, cosine_thresh=None, top_k=None):
    if not top_k:
        top_k = self.top_k

    # 1. 查询向量化
    embs = self.embeddings._get_len_safe_embeddings(queries)

    # 2. 批量向量搜索
    t1 = time.time()
    batch_result = milvus_kb.search_emb_async(embs=embs, top_k=top_k, queries=queries)
    t2 = time.time()
    debug_logger.info(f"milvus search time: {t2 - t1}")

    # 3. 结果整理与元数据增强
    source_documents = []
    for query, query_docs in zip(queries, batch_result):
        for doc in query_docs:
            doc.metadata['retrieval_query'] = query
            doc.metadata['embed_version'] = self.embeddings.embed_version
            source_documents.append(doc)

    # 4. 相似度过滤
    if cosine_thresh:
        source_documents = [item for item in source_documents 
                          if float(item.metadata['score']) > cosine_thresh]

    return source_documents
def rerank_documents_for_local(self, query, source_documents):
    # 查询长度限制:超过300 tokens 跳过重排序

LLM集成与答案生成

@get_time
def get_knowledge_based_answer(self, query, milvus_kb, chat_history=None, 
                              streaming: bool = STREAMING, rerank: bool = False):
    if chat_history is None:
        chat_history = []

    retrieval_queries = [query]

    # 1. 文档检索
    source_documents = self.get_source_documents(retrieval_queries, milvus_kb)
    deduplicated_docs = self.deduplicate_documents(source_documents)
    retrieval_documents = sorted(deduplicated_docs, 
                              key=lambda x: x.metadata['score'], 
                              reverse=True)

    # 2. 重排序(可选)
    if rerank and len(retrieval_documents) > 1:
        retrieval_documents = self.rerank_documents(query, retrieval_documents)

    # 3. 上下文处理
    source_documents = self.reprocess_source_documents(
        query=query,
        source_docs=retrieval_documents,
        history=chat_history,
        prompt_template=PROMPT_TEMPLATE
    )

    # 4. Prompt 生成
    prompt = self.generate_prompt(
        query=query,
        source_docs=source_documents,
        prompt_template=PROMPT_TEMPLATE
    )

    # 5. LLM 生成答案(支持流式)
    t1 = time.time()
    for answer_result in self.llm.generatorAnswer(
        prompt=prompt,
        history=chat_history,
        streaming=streaming
    ):
        resp = answer_result.llm_output["answer"]
        prompt = answer_result.prompt
        history = answer_result.history

        # 确保历史记录正确
        history[-1][0] = query

        response = {
            "query": query,
            "prompt": prompt,
            "result": resp,
            "retrieval_documents": retrieval_documents,
            "source_documents": source_documents
        }
        yield response, history

    t2 = time.time()
    debug_logger.info(f"LLM time: {t2 - t1}")

LocalFile类

LocalFile 是 QAnything 的文档「预处理工厂」,它只做三件事,却决定了后续检索与问答的质量天花板:
1、吃 进来一个文件(或 URL);
2、拆 成语义完整的文本块;
3、吐 出带嵌入向量的 List,供 LocalDocQA 直接入库与检索。

class LocalFile:
    def __init__(self, user_id, kb_id, file, file_id, file_name, embedding, ...):
        # 保存上传文件到本地;记录 user/kb/文件元数据
        ...
    def split_file_to_docs(self, ocr_engine: Callable, ...):
        # 按扩展名选 Loader → 中文切分 → 二次递归切分 → 注入 metadata
        ...
    def create_embedding(self):
        # 批量生成 embedding,与 docs 顺序严格对齐
        self.embs = self.emb_infer._get_len_safe_embeddings(
            [doc.page_content for doc in self.docs])

QA后端接口实现

知识库

1、创建知识库

接收 user_id + kb_name
├─ MySQL 插入一条 kb 记录(状态 initializing)
├─ LocalDocQA.create_milvus_collection() → Milvus 创建分区
└─ 返回 kb_id(UUID)
  • 幂等:同名库重复调用会返回已存在 kb_id。
  • 事务:MySQL 成功才创建 Milvus 分区,保证元数据与向量存储一致。

2、列表、重命名、删除

  • 列表:带分页、总条数,支持按 updated_at 排序。
  • 重命名:仅更新 MySQL 字段,Milvus 分区名不变(解耦)。
  • 删除:
    a.级联删除 Milvus 分区数据;
    b.删除本地文件目录;
    c.删除 MySQL 记录;
    d.异步清理客户端缓存;

文件操作

1、本地文件、网页上传

async def upload_files(request):
    user_id   = request.form.get('user_id')
    kb_id     = request.form.get('kb_id')
    files     = request.files.getlist('files')
    local_files = []
    for f in files:
        lf = LocalFile(user_id, kb_id, f, file_id=uuid4(),
                       file_name=f.name, embedding=embed_instance)
        local_files.append(lf)
    await local_doc_qa.insert_files_to_milvus(user_id, kb_id, local_files)
    return json({'success': len(success_list), 'failed': len(failed_list)})

多文件并发解析,异常单文件粒度捕获,失败文件写 status=red。 秒级返回,解析与向量化后置到后台线程(对前端无阻塞)。

2、列表、删除、其它

端点 亮点
/list_files 分页 + 状态过滤(green/yellow/red),前端实时展示解析进度
/delete_files 支持批量 file_id,事务内同时删 Milvus 向量 & 磁盘文件
/get_total_status 一条 SQL 聚合出「库/文件/异常」总数,供管理看板
/clean_files_by_status 定时任务入口,一键回收失败文件占用的向量空间

QA流程

对话接口 /local_doc_chat —— RAG 灵魂

async def local_doc_chat(request):
    req = request.json
    async for chunk, history in local_doc_qa.get_knowledge_based_answer(
            req['question'], milvus_kb, streaming=req['streaming']):
        yield f"data: {json.dumps(chunk)}\n\n"

用户问题 → Embedding → Milvus 粗排(top-k=40)→ BCE Reranker 精排 → 阈值过滤 → 拼 Prompt → LLM 流式回答

总结

QAnything 用 14 个接口勾勒出「知识库 → 文件 → 向量 → 对话」的完整闭环:

  • 零冗余:每个端点只干一件事;
  • 高内聚:同一业务域参数风格一致;
  • 可观测:处处打日志、埋状态、给进度。

无论你是要集成 RAG 能力,还是参考设计自己的知识管理 API,这套设计都值得借鉴。如果你想了解更多开源项目的实现细节或在技术实现上有任何疑问,欢迎在云栈社区与大家一起交流探讨。




上一篇:京东Oxygen 9N-LLM框架:多框架融合下的大规模生成式推荐训练实践
下一篇:Sanic实战:Python异步Web框架的架构设计与性能调优
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-1 20:46 , Processed in 0.430652 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表