通过前面几章的学习,我们已经掌握了 LangChain 的核心模块。现在是时候把所有零件组装起来,打造一个功能完备的端到端应用了。本章我们将构建一个 AI 知识库助手,它具备文档上传、语义检索、多轮对话和流式响应能力,并使用 FastAPI 作为后端框架。
7.1 项目架构设计
我们如何将这些模块串联起来,形成一个可运行的系统?以下是为这个知识库助手设计的架构与目录结构。
项目结构:
knowledge-assistant/
├── main.py # FastAPI 入口
├── chains.py # LangChain 链定义
├── knowledge_base.py # 知识库管理(加载、分割、向量化)
├── memory_store.py # 对话记忆管理
├── config.py # 配置(API Key、模型参数)
├── requirements.txt # 依赖
└── docs/ # 用户上传的文档目录
整个系统的工作流程可以通过下面的架构图来清晰展示:
用户(前端/curl/Postman)
│
▼
┌─────────────────────────────────────────────┐
│ FastAPI 后端 │
├─────────────────────────────────────────────┤
│ │
│ POST /upload → knowledge_base.py │
│ 上传文档 → 分割 → 向量化 → Chroma │
│ │
│ POST /chat → chains.py │
│ 问题 → 检索文档 → 拼接prompt → LLM │
│ ↕ memory_store.py(多轮记忆) │
│ ↕ 流式SSE响应 │
│ │
├─────────────────────────────────────────────┤
│ Chroma(向量数据库) │ InMemory(记忆存储)│
└─────────────────────────────────────────────┘
7.2 后端:FastAPI + LangChain 集成
首先,我们需要一个中心化的配置管理,将 API Key、模型选择等参数集中管理。
config.py — 配置管理:
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
# 模型配置
LLM_MODEL = "deepseek-chat"
LLM_API_KEY = os.getenv("DEEPSEEK_API_KEY")
# Embedding 配置
EMBEDDING_MODEL = "text-embedding-3-small"
# 知识库配置
CHROMA_DB_DIR = "./chroma_db"
DOCS_DIR = "./docs"
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
接下来是应用的主入口,这里我们使用 FastAPI 来构建 RESTful API,处理文件上传和对话请求。
main.py — FastAPI 入口:
# main.py
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from knowledge_base import KnowledgeBase
from chains import create_rag_chain
from memory_store import MemoryStore
app = FastAPI(title="AI 知识库助手")
# 初始化核心组件
kb = KnowledgeBase()
memory = MemoryStore()
class ChatRequest(BaseModel):
question: str
session_id: str = "default"
@app.post("/upload")
async def upload_document(file: UploadFile = File(...)):
"""上传文档到知识库"""
content = await file.read()
file_path = f"./docs/{file.filename}"
with open(file_path, "wb") as f:
f.write(content)
# 向量化并存入数据库
num_chunks = kb.add_document(file_path)
return {"message": f"✅ 已添加 {file.filename},生成 {num_chunks} 个向量块"}
@app.post("/chat")
async def chat(req: ChatRequest):
"""对话接口(流式响应)"""
chain = create_rag_chain(kb.vectorstore, memory.get_history(req.session_id))
async def generate():
full_response = ""
async for chunk in chain.astream({"input": req.question}):
full_response += chunk
yield chunk
# 对话结束后保存记忆
memory.save_message(req.session_id, req.question, full_response)
return StreamingResponse(generate(), media_type="text/plain")
@app.get("/health")
async def health():
return {"status": "ok", "documents": kb.doc_count}
7.3 知识库管理:文档上传与向量化
知识库模块负责文档的加载、文本分割以及向量化存储,这是实现 RAG(检索增强生成)能力的基础。
knowledge_base.py — 文档加载 + 分割 + 向量化:
# knowledge_base.py
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
import config
class KnowledgeBase:
def __init__(self):
self.embeddings = OpenAIEmbeddings(model=config.EMBEDDING_MODEL)
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=config.CHUNK_SIZE,
chunk_overlap=config.CHUNK_OVERLAP,
)
# 加载已有的向量数据库(如果存在的话)
self.vectorstore = Chroma(
persist_directory=config.CHROMA_DB_DIR,
embedding_function=self.embeddings,
)
def add_document(self, file_path: str) -> int:
"""添加文档到知识库"""
# 根据文件类型选择加载器
if file_path.endswith(".pdf"):
loader = PyPDFLoader(file_path)
else:
loader = TextLoader(file_path, encoding="utf-8")
docs = loader.load()
chunks = self.splitter.split_documents(docs)
# 添加到向量数据库
self.vectorstore.add_documents(chunks)
return len(chunks)
@property
def doc_count(self) -> int:
return self.vectorstore._collection.count()
7.4 对话接口:流式响应 + 多轮记忆
为了实现连贯的对话体验,我们需要管理会话历史,并构建一个能够整合检索结果和记忆的 LangChain 链。
memory_store.py — 会话记忆管理:
# memory_store.py
from langchain_core.messages import HumanMessage, AIMessage
class MemoryStore:
"""简单的内存会话存储"""
def __init__(self, max_history=10):
self.store = {} # session_id → messages list
self.max_history = max_history
def get_history(self, session_id: str) -> list:
return self.store.get(session_id, [])
def save_message(self, session_id: str, human_msg: str, ai_msg: str):
if session_id not in self.store:
self.store[session_id] = []
self.store[session_id].extend([
HumanMessage(content=human_msg),
AIMessage(content=ai_msg),
])
# 保留最近 N 轮(防止记忆过长)
if len(self.store[session_id]) > self.max_history * 2:
self.store[session_id] = self.store[session_id][-self.max_history * 2:]
chains.py — RAG + 记忆链:
# chains.py
from langchain_deepseek import ChatDeepSeek
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
import config
def create_rag_chain(vectorstore, chat_history: list):
"""创建带记忆的 RAG 链"""
llm = ChatDeepSeek(
model=config.LLM_MODEL,
streaming=True,
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
def format_docs(docs):
return "\n\n---\n\n".join(
f"【{doc.metadata.get('source', '?')}】\n{doc.page_content}"
for doc in docs
)
prompt = ChatPromptTemplate.from_messages([
("system", """你是一个智能知识库助手。请基于参考文档回答问题。
如果文档中找不到答案,请坦诚告知。回答时引用文档来源。
参考文档:
{context}"""),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
])
chain = (
{
"context": (lambda x: x["input"]) | retriever | format_docs,
"chat_history": lambda x: chat_history,
"input": lambda x: x["input"],
}
| prompt
| llm
| StrOutputParser()
)
return chain
测试 API:
完成代码编写后,我们可以通过命令行快速测试整个流程是否畅通。
# 启动服务
uvicorn main:app --reload
# 上传文档
curl -X POST http://localhost:8000/upload \
-F "file=@./my_notes/python_guide.md"
# {"message": "✅ 已添加 python_guide.md,生成 42 个向量块"}
# 对话(流式)
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"question": "什么是装饰器?", "session_id": "user_001"}'
# 根据文档,装饰器是一种...(流式逐字返回)
7.5 部署与优化:生产环境注意事项
将原型部署到生产环境时,我们需要考虑更多关于安全、性能、可靠性和成本的因素。下面是一份实用的清单。
生产环境 Checklist:
✅ 基础
□ API Key 用环境变量管理,不硬编码
□ 添加 CORS 中间件(前端跨域)
□ 添加 API 认证(Bearer Token / API Key)
✅ 性能
□ 向量数据库用持久化存储(不要每次重启都重建)
□ Embedding 调用做缓存(同一文本不重复计算)
□ 用异步(ainvoke/astream)处理并发请求
□ 大文件上传用后台任务(BackgroundTask)
✅ 可靠性
□ LLM 调用加 retry 和 timeout
□ 添加结构化日志(记录每次问答的输入输出)
□ 集成 LangSmith 做链路追踪(下一章介绍)
✅ 成本
□ 监控 token 使用量
□ 设置每用户/每日的调用限额
□ 长对话用摘要记忆而非 Buffer 记忆
例如,为 LLM 调用添加超时和重试机制,可以显著提升服务的健壮性:
# 示例:添加超时和重试
from langchain_deepseek import ChatDeepSeek
llm = ChatDeepSeek(
model="deepseek-chat",
request_timeout=30, # 30 秒超时
max_retries=3, # 最多重试 3 次
)
第 7 章核心知识回顾
本章我们将之前学到的 LangChain 模块与 Python Web 框架结合,完成了一个实战项目。下表帮你快速回顾各模块对应的技术点:
| 模块 |
技术 |
对应章节 |
| 文档上传+向量化 |
DocumentLoader + Chroma |
第 5 章 RAG |
| RAG 问答链 |
Retriever + Prompt + LLM |
第 5 章 RAG |
| 多轮记忆 |
MessageHistory |
第 4 章 Memory |
| 流式响应 |
astream + StreamingResponse |
第 3 章 Chain |
| API 服务 |
FastAPI |
— |
至此,你已经掌握了使用 LangChain 构建一个端到端 AI 应用的全流程。从文档处理、向量检索、对话链构建到 API 封装,每一步都至关重要。在实际开发中,你可以根据业务需求对此项目进行扩展,例如支持更多文件格式、集成更复杂的记忆系统或添加用户管理功能。如果想与其他开发者交流此类项目的实战经验,可以到 云栈社区 的相关板块进行探讨。