本文深入剖析 QAnything 开源项目的后端服务,包括核心类、接口、逻辑实现,揭示其如何实现高效的检索增强生成(RAG系统,从文档处理到智能问答的完整技术链路。
核心数据结构
| 类 |
一句话职责 |
关键动作 |
输出 |
| LocalFile |
「把一份原始文件变成可检索的向量块」 |
解析 → 分块 → OCR(如有)→ 向量化 → 打包成 (doc, emb) 列表 |
一堆 Document + 对应 embedding |
| LocalDocQA |
「拿这些向量块去回答用户问题」 |
召回 → 重排序 → 截断 → 拼 Prompt → 调 LLM → 流式返回答案 |
生成器,每次 yield 一段答案 + 引用的原文 |
LocalDocQA类
LocalDocQA 是 QAnything 系统的核心引擎,负责协调整个 RAG 流程。该类采用模块化设计,将文档处理、向量存储、语义检索、重排序和 LLM 生成等关键功能有机整合。
核心架构:
class LocalDocQA:
def __init__(self):
self.llm: object = None # LLM 引擎
self.embeddings: object = None # 向量化模型
self.top_k: int = VECTOR_SEARCH_TOP_K # 检索数量
self.chunk_size: int = CHUNK_SIZE # 文本切片大小
self.score_threshold: int = VECTOR_SEARCH_SCORE_THRESHOLD # 相似度阈值
self.milvus_kbs: List[MilvusClient] = [] # Milvus 客户端池
self.milvus_summary: KnowledgeBaseManager = None # 知识库管理
self.mode: str = None # 运行模式
self.local_rerank_service_url = "http://0.0.0.0:8776" # 重排序服务
self.ocr_url = 'http://0.0.0.0:8010/ocr' # OCR 服务
LLM 引擎集成与向量存储
def create_milvus_collection(self, user_id, kb_id, kb_name):
milvus_kb = MilvusClient(self.mode, user_id, [kb_id])
self.milvus_kbs.append(milvus_kb)
self.milvus_summary.new_milvus_base(kb_id, user_id, kb_name)
def match_milvus_kb(self, user_id, kb_ids):
# 智能匹配已存在的 Milvus 客户端
for kb in self.milvus_kbs:
if user_id == kb.user_id and kb_ids == kb.kb_ids:
return kb
# 不存在则创建新的客户端
milvus_kb = MilvusClient(self.mode, user_id, kb_ids)
self.milvus_kbs.append(milvus_kb)
return milvus_kb
文档处理与向量化
async def insert_files_to_milvus(self, user_id, kb_id, local_files: List[LocalFile]):
milvus_kv = self.match_milvus_kb(user_id, [kb_id])
success_list = []
failed_list = []
for local_file in local_files:
try:
# Step 1: 文档解析与切片
local_file.split_file_to_docs(self.get_ocr_result)
content_length = sum([len(doc.page_content) for doc in local_file.docs])
# Step 2: 向量化处理
local_file.create_embedding()
# Step 3: 向量存储
ret = await milvus_kv.insert_files(
local_file.file_id,
local_file.file_name,
local_file.file_path,
local_file.docs,
local_file.embs
)
if ret:
success_list.append(local_file)
self.milvus_summary.update_file_status(local_file.file_id, status='green')
else:
failed_list.append(local_file)
self.milvus_summary.update_file_status(local_file.file_id, status='yellow')
except Exception as e:
# 详细的错误处理和状态更新
self.milvus_summary.update_file_status(local_file.file_id, status='red')
failed_list.append(local_file)
智能检索与重排序
def get_source_documents(self, queries, milvus_kb, cosine_thresh=None, top_k=None):
if not top_k:
top_k = self.top_k
# 1. 查询向量化
embs = self.embeddings._get_len_safe_embeddings(queries)
# 2. 批量向量搜索
t1 = time.time()
batch_result = milvus_kb.search_emb_async(embs=embs, top_k=top_k, queries=queries)
t2 = time.time()
debug_logger.info(f"milvus search time: {t2 - t1}")
# 3. 结果整理与元数据增强
source_documents = []
for query, query_docs in zip(queries, batch_result):
for doc in query_docs:
doc.metadata['retrieval_query'] = query
doc.metadata['embed_version'] = self.embeddings.embed_version
source_documents.append(doc)
# 4. 相似度过滤
if cosine_thresh:
source_documents = [item for item in source_documents
if float(item.metadata['score']) > cosine_thresh]
return source_documents
def rerank_documents_for_local(self, query, source_documents):
# 查询长度限制:超过300 tokens 跳过重排序
LLM集成与答案生成
@get_time
def get_knowledge_based_answer(self, query, milvus_kb, chat_history=None,
streaming: bool = STREAMING, rerank: bool = False):
if chat_history is None:
chat_history = []
retrieval_queries = [query]
# 1. 文档检索
source_documents = self.get_source_documents(retrieval_queries, milvus_kb)
deduplicated_docs = self.deduplicate_documents(source_documents)
retrieval_documents = sorted(deduplicated_docs,
key=lambda x: x.metadata['score'],
reverse=True)
# 2. 重排序(可选)
if rerank and len(retrieval_documents) > 1:
retrieval_documents = self.rerank_documents(query, retrieval_documents)
# 3. 上下文处理
source_documents = self.reprocess_source_documents(
query=query,
source_docs=retrieval_documents,
history=chat_history,
prompt_template=PROMPT_TEMPLATE
)
# 4. Prompt 生成
prompt = self.generate_prompt(
query=query,
source_docs=source_documents,
prompt_template=PROMPT_TEMPLATE
)
# 5. LLM 生成答案(支持流式)
t1 = time.time()
for answer_result in self.llm.generatorAnswer(
prompt=prompt,
history=chat_history,
streaming=streaming
):
resp = answer_result.llm_output["answer"]
prompt = answer_result.prompt
history = answer_result.history
# 确保历史记录正确
history[-1][0] = query
response = {
"query": query,
"prompt": prompt,
"result": resp,
"retrieval_documents": retrieval_documents,
"source_documents": source_documents
}
yield response, history
t2 = time.time()
debug_logger.info(f"LLM time: {t2 - t1}")
LocalFile类
LocalFile 是 QAnything 的文档「预处理工厂」,它只做三件事,却决定了后续检索与问答的质量天花板:
1、吃 进来一个文件(或 URL);
2、拆 成语义完整的文本块;
3、吐 出带嵌入向量的 List,供 LocalDocQA 直接入库与检索。
class LocalFile:
def __init__(self, user_id, kb_id, file, file_id, file_name, embedding, ...):
# 保存上传文件到本地;记录 user/kb/文件元数据
...
def split_file_to_docs(self, ocr_engine: Callable, ...):
# 按扩展名选 Loader → 中文切分 → 二次递归切分 → 注入 metadata
...
def create_embedding(self):
# 批量生成 embedding,与 docs 顺序严格对齐
self.embs = self.emb_infer._get_len_safe_embeddings(
[doc.page_content for doc in self.docs])
QA后端接口实现
知识库
1、创建知识库
接收 user_id + kb_name
├─ MySQL 插入一条 kb 记录(状态 initializing)
├─ LocalDocQA.create_milvus_collection() → Milvus 创建分区
└─ 返回 kb_id(UUID)
- 幂等:同名库重复调用会返回已存在 kb_id。
- 事务:MySQL 成功才创建 Milvus 分区,保证元数据与向量存储一致。
2、列表、重命名、删除
- 列表:带分页、总条数,支持按 updated_at 排序。
- 重命名:仅更新 MySQL 字段,Milvus 分区名不变(解耦)。
- 删除:
a.级联删除 Milvus 分区数据;
b.删除本地文件目录;
c.删除 MySQL 记录;
d.异步清理客户端缓存;
文件操作
1、本地文件、网页上传
async def upload_files(request):
user_id = request.form.get('user_id')
kb_id = request.form.get('kb_id')
files = request.files.getlist('files')
local_files = []
for f in files:
lf = LocalFile(user_id, kb_id, f, file_id=uuid4(),
file_name=f.name, embedding=embed_instance)
local_files.append(lf)
await local_doc_qa.insert_files_to_milvus(user_id, kb_id, local_files)
return json({'success': len(success_list), 'failed': len(failed_list)})
多文件并发解析,异常单文件粒度捕获,失败文件写 status=red。 秒级返回,解析与向量化后置到后台线程(对前端无阻塞)。
2、列表、删除、其它
| 端点 |
亮点 |
/list_files |
分页 + 状态过滤(green/yellow/red),前端实时展示解析进度 |
/delete_files |
支持批量 file_id,事务内同时删 Milvus 向量 & 磁盘文件 |
/get_total_status |
一条 SQL 聚合出「库/文件/异常」总数,供管理看板 |
/clean_files_by_status |
定时任务入口,一键回收失败文件占用的向量空间 |
QA流程
对话接口 /local_doc_chat —— RAG 灵魂
async def local_doc_chat(request):
req = request.json
async for chunk, history in local_doc_qa.get_knowledge_based_answer(
req['question'], milvus_kb, streaming=req['streaming']):
yield f"data: {json.dumps(chunk)}\n\n"
用户问题 → Embedding → Milvus 粗排(top-k=40)→ BCE Reranker 精排 → 阈值过滤 → 拼 Prompt → LLM 流式回答
总结
QAnything 用 14 个接口勾勒出「知识库 → 文件 → 向量 → 对话」的完整闭环:
- 零冗余:每个端点只干一件事;
- 高内聚:同一业务域参数风格一致;
- 可观测:处处打日志、埋状态、给进度。
无论你是要集成 RAG 能力,还是参考设计自己的知识管理 API,这套设计都值得借鉴。如果你想了解更多开源项目的实现细节或在技术实现上有任何疑问,欢迎在云栈社区与大家一起交流探讨。