找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

5211

积分

0

好友

692

主题
发表于 1 小时前 | 查看: 2| 回复: 0

通过前面几章的学习,我们已经掌握了 LangChain 的核心模块。现在是时候把所有零件组装起来,打造一个功能完备的端到端应用了。本章我们将构建一个 AI 知识库助手,它具备文档上传、语义检索、多轮对话和流式响应能力,并使用 FastAPI 作为后端框架。

7.1 项目架构设计

我们如何将这些模块串联起来,形成一个可运行的系统?以下是为这个知识库助手设计的架构与目录结构。

项目结构:

knowledge-assistant/
├── main.py              # FastAPI 入口
├── chains.py            # LangChain 链定义
├── knowledge_base.py    # 知识库管理(加载、分割、向量化)
├── memory_store.py      # 对话记忆管理
├── config.py            # 配置(API Key、模型参数)
├── requirements.txt     # 依赖
└── docs/                # 用户上传的文档目录

整个系统的工作流程可以通过下面的架构图来清晰展示:

用户(前端/curl/Postman)
      │
      ▼
┌─────────────────────────────────────────────┐
│              FastAPI 后端                     │
├─────────────────────────────────────────────┤
│                                             │
│  POST /upload    → knowledge_base.py        │
│    上传文档 → 分割 → 向量化 → Chroma        │
│                                             │
│  POST /chat      → chains.py                │
│    问题 → 检索文档 → 拼接prompt → LLM       │
│    ↕ memory_store.py(多轮记忆)              │
│    ↕ 流式SSE响应                             │
│                                             │
├─────────────────────────────────────────────┤
│  Chroma(向量数据库)  │  InMemory(记忆存储)│
└─────────────────────────────────────────────┘

7.2 后端:FastAPI + LangChain 集成

首先,我们需要一个中心化的配置管理,将 API Key、模型选择等参数集中管理。

config.py — 配置管理:

# config.py
import os
from dotenv import load_dotenv

load_dotenv()

# 模型配置
LLM_MODEL = "deepseek-chat"
LLM_API_KEY = os.getenv("DEEPSEEK_API_KEY")

# Embedding 配置
EMBEDDING_MODEL = "text-embedding-3-small"

# 知识库配置
CHROMA_DB_DIR = "./chroma_db"
DOCS_DIR = "./docs"
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50

接下来是应用的主入口,这里我们使用 FastAPI 来构建 RESTful API,处理文件上传和对话请求。

main.py — FastAPI 入口:

# main.py
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

from knowledge_base import KnowledgeBase
from chains import create_rag_chain
from memory_store import MemoryStore

app = FastAPI(title="AI 知识库助手")

# 初始化核心组件
kb = KnowledgeBase()
memory = MemoryStore()

class ChatRequest(BaseModel):
    question: str
    session_id: str = "default"

@app.post("/upload")
async def upload_document(file: UploadFile = File(...)):
    """上传文档到知识库"""
    content = await file.read()
    file_path = f"./docs/{file.filename}"

    with open(file_path, "wb") as f:
        f.write(content)

    # 向量化并存入数据库
    num_chunks = kb.add_document(file_path)
    return {"message": f"✅ 已添加 {file.filename},生成 {num_chunks} 个向量块"}

@app.post("/chat")
async def chat(req: ChatRequest):
    """对话接口(流式响应)"""
    chain = create_rag_chain(kb.vectorstore, memory.get_history(req.session_id))

    async def generate():
        full_response = ""
        async for chunk in chain.astream({"input": req.question}):
            full_response += chunk
            yield chunk
        # 对话结束后保存记忆
        memory.save_message(req.session_id, req.question, full_response)

    return StreamingResponse(generate(), media_type="text/plain")

@app.get("/health")
async def health():
    return {"status": "ok", "documents": kb.doc_count}

7.3 知识库管理:文档上传与向量化

知识库模块负责文档的加载、文本分割以及向量化存储,这是实现 RAG(检索增强生成)能力的基础。

knowledge_base.py — 文档加载 + 分割 + 向量化:

# knowledge_base.py
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
import config

class KnowledgeBase:
    def __init__(self):
        self.embeddings = OpenAIEmbeddings(model=config.EMBEDDING_MODEL)
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=config.CHUNK_SIZE,
            chunk_overlap=config.CHUNK_OVERLAP,
        )

        # 加载已有的向量数据库(如果存在的话)
        self.vectorstore = Chroma(
            persist_directory=config.CHROMA_DB_DIR,
            embedding_function=self.embeddings,
        )

    def add_document(self, file_path: str) -> int:
        """添加文档到知识库"""
        # 根据文件类型选择加载器
        if file_path.endswith(".pdf"):
            loader = PyPDFLoader(file_path)
        else:
            loader = TextLoader(file_path, encoding="utf-8")

        docs = loader.load()
        chunks = self.splitter.split_documents(docs)

        # 添加到向量数据库
        self.vectorstore.add_documents(chunks)
        return len(chunks)

    @property
    def doc_count(self) -> int:
        return self.vectorstore._collection.count()

7.4 对话接口:流式响应 + 多轮记忆

为了实现连贯的对话体验,我们需要管理会话历史,并构建一个能够整合检索结果和记忆的 LangChain 链。

memory_store.py — 会话记忆管理:

# memory_store.py
from langchain_core.messages import HumanMessage, AIMessage

class MemoryStore:
    """简单的内存会话存储"""

    def __init__(self, max_history=10):
        self.store = {}           # session_id → messages list
        self.max_history = max_history

    def get_history(self, session_id: str) -> list:
        return self.store.get(session_id, [])

    def save_message(self, session_id: str, human_msg: str, ai_msg: str):
        if session_id not in self.store:
            self.store[session_id] = []

        self.store[session_id].extend([
            HumanMessage(content=human_msg),
            AIMessage(content=ai_msg),
        ])

        # 保留最近 N 轮(防止记忆过长)
        if len(self.store[session_id]) > self.max_history * 2:
            self.store[session_id] = self.store[session_id][-self.max_history * 2:]

chains.py — RAG + 记忆链:

# chains.py
from langchain_deepseek import ChatDeepSeek
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
import config

def create_rag_chain(vectorstore, chat_history: list):
    """创建带记忆的 RAG 链"""
    llm = ChatDeepSeek(
        model=config.LLM_MODEL,
        streaming=True,
    )

    retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

    def format_docs(docs):
        return "\n\n---\n\n".join(
            f"【{doc.metadata.get('source', '?')}】\n{doc.page_content}"
            for doc in docs
        )

    prompt = ChatPromptTemplate.from_messages([
        ("system", """你是一个智能知识库助手。请基于参考文档回答问题。
如果文档中找不到答案,请坦诚告知。回答时引用文档来源。

参考文档:
{context}"""),
        MessagesPlaceholder(variable_name="chat_history"),
        ("human", "{input}"),
    ])

    chain = (
        {
            "context": (lambda x: x["input"]) | retriever | format_docs,
            "chat_history": lambda x: chat_history,
            "input": lambda x: x["input"],
        }
        | prompt
        | llm
        | StrOutputParser()
    )

    return chain

测试 API:
完成代码编写后,我们可以通过命令行快速测试整个流程是否畅通。

# 启动服务
uvicorn main:app --reload

# 上传文档
curl -X POST http://localhost:8000/upload \
  -F "file=@./my_notes/python_guide.md"
# {"message": "✅ 已添加 python_guide.md,生成 42 个向量块"}

# 对话(流式)
curl -X POST http://localhost:8000/chat \
  -H "Content-Type: application/json" \
  -d '{"question": "什么是装饰器?", "session_id": "user_001"}'
# 根据文档,装饰器是一种...(流式逐字返回)

7.5 部署与优化:生产环境注意事项

将原型部署到生产环境时,我们需要考虑更多关于安全、性能、可靠性和成本的因素。下面是一份实用的清单。

生产环境 Checklist:

✅ 基础
  □ API Key 用环境变量管理,不硬编码
  □ 添加 CORS 中间件(前端跨域)
  □ 添加 API 认证(Bearer Token / API Key)

✅ 性能
  □ 向量数据库用持久化存储(不要每次重启都重建)
  □ Embedding 调用做缓存(同一文本不重复计算)
  □ 用异步(ainvoke/astream)处理并发请求
  □ 大文件上传用后台任务(BackgroundTask)

✅ 可靠性
  □ LLM 调用加 retry 和 timeout
  □ 添加结构化日志(记录每次问答的输入输出)
  □ 集成 LangSmith 做链路追踪(下一章介绍)

✅ 成本
  □ 监控 token 使用量
  □ 设置每用户/每日的调用限额
  □ 长对话用摘要记忆而非 Buffer 记忆

例如,为 LLM 调用添加超时和重试机制,可以显著提升服务的健壮性:

# 示例:添加超时和重试
from langchain_deepseek import ChatDeepSeek

llm = ChatDeepSeek(
    model="deepseek-chat",
    request_timeout=30,       # 30 秒超时
    max_retries=3,            # 最多重试 3 次
)

第 7 章核心知识回顾

本章我们将之前学到的 LangChain 模块与 Python Web 框架结合,完成了一个实战项目。下表帮你快速回顾各模块对应的技术点:

模块 技术 对应章节
文档上传+向量化 DocumentLoader + Chroma 第 5 章 RAG
RAG 问答链 Retriever + Prompt + LLM 第 5 章 RAG
多轮记忆 MessageHistory 第 4 章 Memory
流式响应 astream + StreamingResponse 第 3 章 Chain
API 服务 FastAPI

至此,你已经掌握了使用 LangChain 构建一个端到端 AI 应用的全流程。从文档处理、向量检索、对话链构建到 API 封装,每一步都至关重要。在实际开发中,你可以根据业务需求对此项目进行扩展,例如支持更多文件格式、集成更复杂的记忆系统或添加用户管理功能。如果想与其他开发者交流此类项目的实战经验,可以到 云栈社区 的相关板块进行探讨。




上一篇:Claude Code 自动化工作流拆解:模型之外,软件工程如何沉淀核心资产
下一篇:探究目的论:为什么你的人生改变总是卡在第二步?
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-17 04:04 , Processed in 0.834376 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表