如果说 ChatGPT 更像“对话式生成器”,Perplexity 更像“带证据链的答案引擎”,那么企业真正需要的,往往是第三种系统:既能接入公网搜索,也能接入内部知识库;既能流式回答,也能给出可追溯来源;既要低成本本地化部署,也要扛住高并发、可观测、可扩展。
本文不再停留在“搜索 + LLM = AI 搜索”这一层,而是从架构师视角,完整拆解一套可落地、可演进、可生产部署的本地 AI 搜索引擎方案。在云栈社区,我们一直探讨如何将前沿的AI技术真正工程化落地,而这套方案正是对“LLM应用”与“后端 & 架构”深度结合的一次完整实践。
一、为什么企业真正需要的是“本地 AI 搜索引擎”
很多团队一开始会把 AI 搜索想简单了:
- 给大模型一个问题
- 调用搜索引擎拿几个网页
- 把网页正文拼进 Prompt
- 让模型输出答案
这个 Demo 在技术分享里没问题,但一旦放到真实业务,就会迅速暴露四类系统性问题。
1.1 数据合规问题
企业知识往往来自:
- 内部 Wiki
- 研发文档
- 运维 SOP
- 代码仓库
- 事故复盘
- 客服知识库
- 合同、制度、流程文档
这些内容通常不能直接上传到公网模型。尤其在医疗、金融、政企、制造场景,数据出域本身就不被允许。
1.2 搜索质量问题
单纯“搜网页 + 拼上下文”会遇到:
- 召回不全:只靠单路搜索,容易漏掉关键资料
- 相关性不稳:标题匹配但正文无效
- 冗余严重:多个页面内容重复,浪费上下文窗口
- 幻觉风险高:模型把“检索片段”脑补成“结论”
1.3 延迟与吞吐问题
一个能真正上线的 AI 搜索系统,必须回答以下问题:
- 首 Token 延迟能否控制在 2s 以内
- 单机或单集群能否承载百级到千级并发
- 检索、抽取、重排、生成这些阶段是否并行
- 热点问题是否有缓存
- 模型冷启动和上下文过长是否被治理
1.4 工程治理问题
当系统开始服务真实用户时,架构重点就从“能不能回答”转向:
- 可观测:哪一步慢,哪一步错
- 可回放:为什么这个答案这样生成
- 可灰度:新模型、新检索策略如何上线
- 可隔离:搜索服务抖动时,生成链路能否降级
- 可扩展:后续如何接入内部知识库、权限体系、多租户
所以,从企业视角看,本地 AI 搜索引擎的本质不是一个 Prompt 工程 Demo,而是一个“检索增强型答案系统”。
二、先给结论:一套生产级 AI 搜索引擎应该长什么样
2.1 核心能力清单
一套可上线的本地 AI 搜索引擎,至少要包含下面这些能力:
- 混合召回:公网搜索 + 内部知识库 + 向量检索 + 关键词检索
- 查询重写:把自然语言问题改写成更适合搜索的形式
- 多阶段排序:粗排、精排、去重、时效性加权
- 上下文治理:Chunk 切分、Token 预算、窗口裁剪、引用编号
- 流式回答:SSE/WebSocket 输出,降低用户感知等待
- 结果缓存:搜索结果缓存、抽取正文缓存、答案缓存
- 并发治理:限流、连接池、熔断、隔离舱、降级
- 可观测性:trace、metrics、日志、回放
- 多模型演进:推理模型、Embedding 模型、Rerank 模型解耦
2.2 推荐总体架构
┌─────────────────────┐
│ Web / APP │
└──────────┬───────────┘
│
┌──────────▼───────────┐
│ API Gateway / BFF │
│ Auth / Quota / CORS │
└──────────┬───────────┘
│
┌─────────────────────▼─────────────────┐
│ Search Orchestrator │
│ Query Rewrite / Fanout / Merge │
└───────┬──────────┬──────────┬─────────┘
│ │ │
┌───────────▼───┐ ┌────▼────┐ ┌──▼──────────┐
│ Web Search │ │ Vector │ │ BM25 / FTS │
│ SearXNG │ │ Store │ │ OpenSearch │
└───────────┬───┘ └────┬────┘ └──┬──────────┘
│ │ │
└──────┬───┴──────┬───┘
│ │
┌───────────▼──────────▼───────┐
│ Recall Merge + Rerank │
│ Dedup / Freshness / Score │
└───────────┬──────────────────┘
│
┌───────────▼──────────┐
│ Context Builder │
│ Chunk / Budget │
└───────────┬──────────┘
│
┌───────────────▼──────────────┐
│ LLM Answer Service │
│ Ollama / vLLM / llama.cpp │
└───────────────┬──────────────┘
│
┌───────────▼──────────┐
│ SSE / Streaming │
└──────────────────────┘
旁路支撑能力:
- Redis:缓存、幂等、热点保护
- PostgreSQL:审计、会话、检索记录、离线评测
- Prometheus + Grafana:观测
- Kafka / RabbitMQ:异步入库、索引更新
2.3 一句话理解这套架构
它不是“让 LLM 去搜索”,而是“由检索编排层先构造证据,再让 LLM 基于证据生成答案”。
这个差异决定了系统的可控性和可生产性。
三、核心原理:为什么 AI 搜索不是简单的 RAG
3.1 RAG 的基本流程
RAG 的经典链路是:
- 用户提问
- 从知识库召回相关内容
- 将内容拼入 Prompt
- 大模型基于检索结果生成答案
这个流程没错,但在生产环境中,真正决定效果的不是“有没有检索”,而是以下四个工程细节。
3.2 Query Rewrite:用户问题通常不适合直接检索
用户会这样问:
- “我们上次线上支付超时怎么排查的?”
- “为什么 Java 服务在 k8s 里 RSS 很高?”
- “本地知识库和公网搜索结果冲突时应该信谁?”
这些问题直接拿去做搜索,经常命中不准。原因很简单:自然语言问题不等于高质量检索语句。
生产级做法通常是把用户问题拆成三个视角:
- 原始问题:保留用户真实意图
- 搜索重写:提炼关键词、限定领域、补齐专业术语
- 多跳问题:复杂问题拆成多个子问题并行召回
例如:
原问题:
为什么 Java 服务在 k8s 里 RSS 很高?
重写后:
Kubernetes Java RSS memory high container memory usage heap off-heap page cache
子问题:
1. Java 进程 RSS 由哪些部分组成
2. 容器内 RSS 和 JVM Heap 的关系
3. k8s 场景下如何排查 off-heap / mmap / page cache
这一步会直接提升召回质量。
3.3 Recall:单路召回在企业场景一定不够
生产系统通常不只一条召回链路,而是多路并行:
- Web Search:适合查最新信息、公开资料、社区经验
- BM25/全文检索:适合精准关键词命中
- Vector Search:适合语义相似问题和口语化表达
- Metadata Filter:适合按部门、项目、时间、文档类型筛选
为什么要混合召回?
- 关键词检索的精确性高,但语义泛化弱
- 向量检索的召回广,但容易“像但不对”
- 公网搜索时效性强,但可信度不稳定
- 内部知识库可信度高,但覆盖范围有限
所以最常见的正确姿势是:
关键词召回保精度 + 向量召回保覆盖 + 公网召回补时效
3.4 Rerank:召回出来不代表能直接用
很多 Demo 在召回后直接把 TopK 拼进 Prompt。问题在于:
- TopK 里可能有重复页面
- 片段可能和问题只“部分相关”
- 多个来源之间可能互相冲突
- 一些片段虽然相关,但信息密度极低
因此需要至少两级排序:
- 粗排:关键词得分、来源权重、时间衰减、点击先验
- 精排:Cross-Encoder Rerank,对“问题-文档片段”做成对打分
Cross-Encoder 的本质是:不是单独编码 query 和 document 再算余弦,而是把两者一起输入模型,让模型直接判断“这段内容到底能不能回答这个问题”。
在复杂问题里,这一步的提升通常比“换更大的生成模型”更明显。
3.5 Grounded Generation:答案必须被证据约束
生产级 AI 搜索和普通聊天机器人最大的区别,不是答案更长,而是答案更“可证伪”。
推荐的生成约束包括:
- 仅依据提供的参考资料回答
- 资料不足时明确说明不确定
- 结论后标注引用编号
- 输出“答案 + 证据来源 + 不确定性声明”
也就是说,我们需要的是:
answerable 比 fluent 更重要
faithful 比 creative 更重要
四、生产级架构设计:从 Demo 到可上线系统
4.1 分层架构
推荐把系统拆成 6 层。
接入层
职责:
- 鉴权
- 限流
- 请求校验
- SSE / WebSocket 流式输出
- 租户隔离
可选技术:
- Nginx / Kong / APISIX
- FastAPI / Starlette
编排层
职责:
- 查询重写
- 多路召回 fanout
- 召回合并
- 结果去重
- 上下文构建
- 答案生成工作流编排
这一层是整个系统的大脑。
检索层
职责:
- 外部搜索引擎访问
- 内部知识库召回
- 向量检索
- 关键词检索
- 元数据过滤
模型层
职责:
建议分离部署,而不是混在一个 Python 进程里。
数据层
职责:
- 文档原文存储
- 向量索引
- 倒排索引
- 缓存
- 会话和审计
治理层
职责:
4.2 在线链路时序图
(图略,建议补充系统交互时序图)
4.3 离线入库链路
在线搜索只是一半,企业真正可持续的价值来自“知识资产化”。
离线链路通常包括:
- 文档采集:Wiki、PDF、Markdown、代码仓库、数据库、SOP
- 内容解析:OCR、正文提取、表格抽取、代码块保留
- 文本标准化:去噪、清洗、统一编码
- Chunk 切分:按标题、语义、长度切块
- 元数据注入:租户、部门、项目、版本、时间、权限
- Embedding:生成向量
- 索引写入:向量库 + 倒排索引
- 版本管理:支持回滚和双索引切换
线上效果稳定的关键,往往不在 LLM,而在这条离线链路做得是否扎实。
五、工程化升级重点:高并发、可扩展、可治理
5.1 并发模型:不是所有步骤都应该串行
一次查询大致包含这些阶段:
- 请求校验
- 查询改写
- 多路召回
- 正文抽取
- 重排
- 上下文构建
- LLM 推理
其中最适合并行化的是:
其中最需要限流的是:
- LLM 推理
- Rerank 模型
- 外部搜索引擎访问
推荐的原则:
- I/O 密集阶段异步并发
- GPU/CPU 重计算阶段显式信号量控制
- 慢依赖通过超时、隔离舱和熔断保护
5.2 缓存设计:热点问题是天然红利
AI 搜索的缓存不是单一缓存,而是多层缓存。
L1:进程内缓存
适合缓存:
- 热门 query 的改写结果
- 热门 query 的召回结果
- 常见文档正文抽取结果
特点:
适合缓存:
缓存 Key 设计建议
不要只用 query 当 key,建议带上:
- query hash
- tenant id
- model id
- retrieval profile
- topk
- prompt version
否则一旦 Prompt 或召回策略升级,缓存会污染新结果。
5.3 限流与熔断
生产环境中,最常见的两个崩法是:
- 突发并发把 Ollama 打满,排队雪崩
- 外部搜索引擎超时,把整个请求链路拖死
推荐治理策略:
- API 层限流:按租户、按用户、按 IP 配额
- LLM 层并发信号量:限制同时生成数
- 外部搜索熔断:连续失败后短时间走降级
- 摘要/精排降级:超时时直接跳过,不阻塞主链路
5.4 多模型解耦
千万不要把“一个模型解决所有问题”当成默认设计。
推荐解耦成三类模型:
- Embedding 模型:负责语义召回
- Rerank 模型:负责问题-片段精排
- Generation 模型:负责最终答案生成
这样做的好处是:
- 可分别扩容
- 可分别灰度
- 可根据成本做替换
- 可以针对中文场景和英文场景独立优化
5.5 多租户与权限控制
企业知识库接入后,权限是必须做的,而不是“以后再说”。
最少要支持:
检索时的权限过滤必须前置在召回阶段,而不是等答案生成后再做脱敏。
因为一旦召回阶段就把不该看的内容送进模型,上下文已经泄漏了。
六、项目结构设计:一份更接近生产环境的代码组织
local-ai-search/
├── app/
│ ├── main.py
│ ├── config.py
│ ├── bootstrap.py
│ ├── api/
│ │ ├── deps.py
│ │ ├── routes_health.py
│ │ └── routes_search.py
│ ├── domain/
│ │ ├── models.py
│ │ ├── enums.py
│ │ └── contracts.py
│ ├── application/
│ │ ├── query_rewrite_service.py
│ │ ├── retrieval_service.py
│ │ ├── answer_service.py
│ │ ├── context_service.py
│ │ └── orchestrator.py
│ ├── infrastructure/
│ │ ├── cache/
│ │ │ ├── memory_cache.py
│ │ │ └── redis_cache.py
│ │ ├── llm/
│ │ │ └── ollama_client.py
│ │ ├── retrieval/
│ │ │ ├── searxng_client.py
│ │ │ ├── vector_store.py
│ │ │ ├── keyword_store.py
│ │ │ └── rerank_client.py
│ │ ├── parser/
│ │ │ └── content_extractor.py
│ │ └── observability/
│ │ ├── metrics.py
│ │ └── tracing.py
│ └── shared/
│ ├── retry.py
│ ├── hashing.py
│ ├── token_budget.py
│ └── jsonlog.py
├── scripts/
│ ├── ingest_docs.py
│ └── evaluate_rag.py
├── tests/
│ ├── unit/
│ ├── integration/
│ └── e2e/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── README.md
这样分层的意义是:
domain 只放业务对象和接口抽象
application 放编排逻辑
infrastructure 放具体实现
后续如果从 Ollama 切到 vLLM、从 Chroma 切到 Milvus,改动会局部很多。
七、核心数据模型设计
7.1 Pydantic 模型
# app/domain/models.py
from __future__ import annotations
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field, HttpUrl
class RetrievalSource(str, Enum):
WEB = "web"
INTERNAL = "internal"
VECTOR = "vector"
KEYWORD = "keyword"
class SearchRequest(BaseModel):
query: str = Field(..., min_length=1, max_length=1000)
tenant_id: str = Field(..., min_length=1, max_length=64)
user_id: str = Field(..., min_length=1, max_length=64)
session_id: str | None = Field(default=None, max_length=128)
stream: bool = True
top_k: int = Field(default=8, ge=1, le=20)
enable_web_search: bool = True
enable_internal_search: bool = True
retrieval_profile: str = Field(default="balanced", pattern="^(balanced|fast|deep)$")
class RetrievedChunk(BaseModel):
chunk_id: str
doc_id: str
title: str
content: str
url: HttpUrl | None = None
source: RetrievalSource
score: float = 0.0
metadata: dict[str, Any] = Field(default_factory=dict)
class Citation(BaseModel):
index: int
title: str
url: HttpUrl | None = None
source: RetrievalSource
class SearchAnswer(BaseModel):
answer: str
citations: list[Citation] = Field(default_factory=list)
latency_ms: int = 0
trace_id: str
degraded: bool = False
debug: dict[str, Any] = Field(default_factory=dict)
7.2 为什么要显式建模 RetrievedChunk
因为生产环境的排序、过滤、审计都要围绕 chunk,而不是原始 document。
原因包括:
- 一个长文档只能局部相关
- chunk 便于引用编号
- chunk 粒度更适合重排
- chunk 更适合构建 token 预算
八、配置设计:让系统能跑在开发、测试、生产三个环境
# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
app_name: str = "local-ai-search"
env: str = "dev"
host: str = "0.0.0.0"
port: int = 8000
ollama_base_url: str = "http://ollama:11434"
ollama_chat_model: str = "qwen2.5:7b-instruct-q4_K_M"
ollama_embed_model: str = "bge-m3:latest"
ollama_timeout_seconds: int = 120
llm_max_concurrency: int = 4
searxng_base_url: str = "http://searxng:8080"
web_search_timeout_ms: int = 2500
redis_url: str = "redis://redis:6379/0"
postgres_dsn: str = "postgresql+psycopg://postgres:postgres@postgres:5432/aisearch"
max_context_tokens: int = 6000
max_chunk_tokens: int = 500
chunk_overlap_tokens: int = 80
retrieval_timeout_ms: int = 3000
rerank_timeout_ms: int = 1500
cache_ttl_search_seconds: int = 300
cache_ttl_answer_seconds: int = 180
trace_enabled: bool = True
metrics_enabled: bool = True
log_level: str = "INFO"
settings = Settings()
这里的设计重点不是字段多,而是:
- 超时独立可配
- 模型独立可配
- 上下文预算独立可配
- 缓存 TTL 独立可配
后期调优会非常频繁,配置项必须充分外置。
九、检索链路设计:生产系统的真正核心
9.1 查询重写服务
# app/application/query_rewrite_service.py
from __future__ import annotations
import re
from dataclasses import dataclass
@dataclass(slots=True)
class RewriteResult:
original_query: str
search_queries: list[str]
intent: str
class QueryRewriteService:
"""
轻量版查询改写器。
生产环境可替换为小模型改写、规则引擎或领域词典增强。
"""
def rewrite(self, query: str) -> RewriteResult:
normalized = re.sub(r"\s+", " ", query.strip())
lowered = normalized.lower()
search_queries = [normalized]
if "k8s" in lowered:
search_queries.append(normalized.replace("k8s", "kubernetes"))
if "rss" in lowered:
search_queries.append(f"{normalized} off-heap mmap page cache")
if "排查" in normalized:
search_queries.append(f"{normalized} runbook troubleshooting")
# 去重保序
deduped = list(dict.fromkeys(search_queries))
intent = "diagnosis" if "排查" in normalized or "为什么" in normalized else "qa"
return RewriteResult(
original_query=normalized,
search_queries=deduped,
intent=intent,
)
这个实现不复杂,但已经体现了一条重要原则:
查询重写服务本质上是一个可演进策略点
你可以先用规则,再换小模型,再叠加领域词典。
9.2 召回聚合服务
# app/application/retrieval_service.py
from __future__ import annotations
import asyncio
from typing import Iterable
from app.domain.models import RetrievedChunk, RetrievalSource
class RetrievalService:
def __init__(self, web_client, vector_store, keyword_store, rerank_client):
self.web_client = web_client
self.vector_store = vector_store
self.keyword_store = keyword_store
self.rerank_client = rerank_client
async def retrieve(
self,
query: str,
search_queries: list[str],
top_k: int,
enable_web_search: bool,
enable_internal_search: bool,
) -> list[RetrievedChunk]:
tasks = []
if enable_web_search:
tasks.append(self._safe_web_search(search_queries))
if enable_internal_search:
tasks.append(self._safe_vector_search(query, top_k * 3))
tasks.append(self._safe_keyword_search(query, top_k * 3))
recall_results = await asyncio.gather(*tasks, return_exceptions=True)
merged = []
for result in recall_results:
if isinstance(result, Exception):
continue
merged.extend(result)
deduped = self._deduplicate(merged)
reranked = await self.rerank_client.rerank(query, deduped, top_k=top_k)
return reranked
async def _safe_web_search(self, queries: list[str]) -> list[RetrievedChunk]:
results = await self.web_client.multi_search(queries)
return [
RetrievedChunk(
chunk_id=item["id"],
doc_id=item["doc_id"],
title=item["title"],
content=item["content"],
url=item.get("url"),
source=RetrievalSource.WEB,
score=float(item.get("score", 0.0)),
metadata=item.get("metadata", {}),
)
for item in results
]
async def _safe_vector_search(self, query: str, limit: int) -> list[RetrievedChunk]:
return await self.vector_store.search(query, limit=limit)
async def _safe_keyword_search(self, query: str, limit: int) -> list[RetrievedChunk]:
return await self.keyword_store.search(query, limit=limit)
def _deduplicate(self, chunks: Iterable[RetrievedChunk]) -> list[RetrievedChunk]:
seen = set()
output: list[RetrievedChunk] = []
for chunk in chunks:
dedup_key = (chunk.doc_id, chunk.content[:120])
if dedup_key in seen:
continue
seen.add(dedup_key)
output.append(chunk)
return output
这里的关键点有三个:
- 多路召回并行执行
- 单路失败不拖垮整体
- 合并后再统一精排
9.3 Rerank 服务
# app/infrastructure/retrieval/rerank_client.py
from __future__ import annotations
from app.domain.models import RetrievedChunk
class RerankClient:
"""
示例实现:先融合规则分,再预留 cross-encoder 精排接口。
生产环境推荐接 BGE Reranker / jina-reranker / bce-reranker。
"""
async def rerank(self, query: str, chunks: list[RetrievedChunk], top_k: int) -> list[RetrievedChunk]:
query_terms = set(query.lower().split())
for chunk in chunks:
title_text = f"{chunk.title} {chunk.content[:500]}".lower()
lexical_score = sum(1 for term in query_terms if term in title_text) / max(len(query_terms), 1)
freshness_boost = 0.0
if chunk.metadata.get("updated_at"):
freshness_boost = 0.05
source_boost = 0.08 if chunk.source.value == "internal" else 0.03
chunk.score = chunk.score * 0.5 + lexical_score * 0.35 + freshness_boost + source_boost
chunks.sort(key=lambda item: item.score, reverse=True)
return chunks[:top_k]
真实线上系统里,一般会采用:
- 召回阶段 Top50 或 Top100
- 精排后截断到 Top5 或 Top8
这个比例比较常见,因为:
- 召回太少,容易漏信息
- 上下文太多,会让模型分心、延迟上升、成本变高
十、上下文构建:决定生成质量的最后一道关
很多时候,模型答偏并不是模型不行,而是上下文构建做得差。
10.1 上下文构建原则
- 不拼冗余内容
- 不拼低信息密度内容
- 不拼冲突且未标记来源的内容
- 给每段上下文编号,方便引用
- 控制总 Token,预留回答空间
10.2 生产级 Context Builder
# app/application/context_service.py
from __future__ import annotations
from dataclasses import dataclass
import tiktoken
from app.config import settings
from app.domain.models import RetrievedChunk
@dataclass(slots=True)
class PromptPackage:
system_prompt: str
user_prompt: str
citations: list[dict]
class ContextService:
def __init__(self) -> None:
self.encoding = tiktoken.get_encoding("cl100k_base")
def build_prompt(self, query: str, chunks: list[RetrievedChunk]) -> PromptPackage:
system_prompt = (
"你是企业级 AI 搜索助手。"
"请严格基于参考资料回答,不要编造。"
"如果资料不足,请明确指出不确定。"
"回答结论后标注引用编号,例如[1][2]。"
"最后附上“参考来源”列表。"
)
token_budget = settings.max_context_tokens
consumed = 0
context_blocks = []
citations = []
for idx, chunk in enumerate(chunks, start=1):
block = f"[{idx}] 标题:{chunk.title}\n来源:{chunk.url or chunk.doc_id}\n内容:{chunk.content.strip()}"
block_tokens = len(self.encoding.encode(block))
if consumed + block_tokens > token_budget:
break
consumed += block_tokens
context_blocks.append(block)
citations.append(
{
"index": idx,
"title": chunk.title,
"url": str(chunk.url) if chunk.url else None,
"source": chunk.source.value,
}
)
user_prompt = (
f"用户问题:\n{query}\n\n"
f"参考资料:\n{'\n\n'.join(context_blocks)}\n\n"
"请输出:\n"
"1. 先给结论\n"
"2. 再给关键依据\n"
"3. 如果存在不确定性,单独说明\n"
"4. 文末列出参考来源"
)
return PromptPackage(
system_prompt=system_prompt,
user_prompt=user_prompt,
citations=citations,
)
10.3 为什么要预留回答空间
如果把上下文塞满整个窗口,会有三个副作用:
- 模型几乎没有余量生成答案
- 推理延迟显著上升
- 重要信息被边缘化
一般经验:
- 60% 给参考资料
- 10% 给 system prompt 和格式约束
- 30% 留给最终回答
这是比“能塞多长塞多长”更稳定的策略。
十一、LLM 推理服务:流式输出、并发控制、超时治理
11.1 Ollama 客户端封装
# app/infrastructure/llm/ollama_client.py
from __future__ import annotations
import asyncio
import json
from collections.abc import AsyncIterator
import httpx
from app.config import settings
class OllamaClient:
def __init__(self) -> None:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(settings.ollama_timeout_seconds, connect=5.0),
limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
)
self._semaphore = asyncio.Semaphore(settings.llm_max_concurrency)
async def chat_stream(self, system_prompt: str, user_prompt: str) -> AsyncIterator[str]:
payload = {
"model": settings.ollama_chat_model,
"stream": True,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"options": {
"temperature": 0.2,
"top_p": 0.9,
"num_predict": 1024,
},
}
async with self._semaphore:
async with self._client.stream("POST", f"{settings.ollama_base_url}/api/chat", json=payload) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if not line.strip():
continue
data = json.loads(line)
if data.get("done"):
break
message = data.get("message", {})
content = message.get("content")
if content:
yield content
async def close(self) -> None:
await self._client.aclose()
11.2 这里最容易犯的几个错
错误 1:用“请求数”当并发控制
LLM 的瓶颈不是 HTTP 请求数,而是模型推理资源。
正确做法是:
错误 2:没有连接池和 keepalive
如果每次都新建连接,TTFT 会被网络和握手拖慢。
错误 3:把搜索和推理绑定死
外部搜索偶发失败时,不应该整个系统不可用。
应该允许“无外部搜索降级回答”或者“仅内部知识库回答”。
十二、编排器:把一切串起来的关键服务
# app/application/orchestrator.py
from __future__ import annotations
import time
from collections.abc import AsyncIterator
from app.domain.models import SearchRequest
class SearchOrchestrator:
def __init__(self, rewrite_service, retrieval_service, context_service, answer_service):
self.rewrite_service = rewrite_service
self.retrieval_service = retrieval_service
self.context_service = context_service
self.answer_service = answer_service
async def stream_answer(self, request: SearchRequest) -> AsyncIterator[str]:
started_at = time.perf_counter()
degraded = False
rewritten = self.rewrite_service.rewrite(request.query)
chunks = await self.retrieval_service.retrieve(
query=rewritten.original_query,
search_queries=rewritten.search_queries,
top_k=request.top_k,
enable_web_search=request.enable_web_search,
enable_internal_search=request.enable_internal_search,
)
if not chunks:
degraded = True
prompt = self.context_service.build_prompt(request.query, chunks)
async for token in self.answer_service.answer_stream(prompt):
yield f"data: {token}\n\n"
latency_ms = int((time.perf_counter() - started_at) * 1000)
tail_payload = {
"type": "final",
"latency_ms": latency_ms,
"degraded": degraded,
"citations": prompt.citations,
}
yield f"data: {tail_payload}\n\n"
yield "data: [DONE]\n\n"
这个 orchestrator 的设计目标很明确:
- 上层 API 不关心底层实现
- 底层组件都可以独立替换
- 整个请求链路可以打通 trace_id 和 metrics
十三、API 层实现:支持 SSE 流式输出
# app/api/routes_search.py
from __future__ import annotations
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from app.api.deps import get_orchestrator
from app.domain.models import SearchRequest
router = APIRouter(prefix="/api/v1", tags=["search"])
@router.post("/search")
async def search(request: SearchRequest, orchestrator=Depends(get_orchestrator)):
stream = orchestrator.stream_answer(request)
return StreamingResponse(
stream,
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
为什么生产里更推荐 SSE?
- 实现简单
- 浏览器原生支持好
- 对“服务端单向推流”场景足够
- 比 WebSocket 更适合问答输出
只有当你需要:
才更建议上 WebSocket。
十四、生产级缓存设计
14.1 两级缓存实现思路
# app/infrastructure/cache/memory_cache.py
from __future__ import annotations
import time
class MemoryCache:
def __init__(self, max_size: int = 1024):
self.max_size = max_size
self.store: dict[str, tuple[float, object]] = {}
def get(self, key: str, ttl_seconds: int):
item = self.store.get(key)
if not item:
return None
saved_at, value = item
if time.time() - saved_at > ttl_seconds:
self.store.pop(key, None)
return None
return value
def set(self, key: str, value: object):
if len(self.store) >= self.max_size:
oldest_key = min(self.store.keys(), key=lambda item: self.store[item][0])
self.store.pop(oldest_key, None)
self.store[key] = (time.time(), value)
# app/infrastructure/cache/redis_cache.py
from __future__ import annotations
import json
import redis.asyncio as redis
class RedisCache:
def __init__(self, redis_url: str):
self.client = redis.from_url(redis_url, encoding="utf-8", decode_responses=True)
async def get_json(self, key: str):
value = await self.client.get(key)
return json.loads(value) if value else None
async def set_json(self, key: str, value: dict | list, ttl_seconds: int):
await self.client.setex(key, ttl_seconds, json.dumps(value, ensure_ascii=False))
14.2 什么值得缓存
最值得缓存的是:
- 搜索结果
- 网页正文抽取结果
- 热点问题最终答案
- Query Rewrite 结果
不建议长期缓存的是:
- 强时效问题答案
- 权限敏感上下文
- 带用户个性化偏好的最终生成结果
14.3 缓存穿透与击穿怎么防
至少做三件事:
- 空结果短 TTL 缓存
- 热 key singleflight 合并
- 随机 TTL 抖动避免雪崩
十五、可观测性建设:没有观测就没有生产系统
15.1 关键指标
建议采集以下指标。
用户体验指标
- QPS
- P50/P95/P99 总延迟
- TTFT 首 Token 延迟
- 每次回答平均 Token 数
- 成功率
检索质量指标
- 每次查询召回数
- Rerank 前后命中分布
- 最终引用来源数
- 空召回率
- 回答拒答率
系统资源指标
- GPU 使用率
- 显存使用率
- CPU 和内存
- Redis 命中率
- 外部搜索超时率
15.2 Prometheus 指标示例
# app/infrastructure/observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge
request_total = Counter(
"ai_search_request_total",
"Total count of ai search requests",
["tenant_id", "status"],
)
request_latency = Histogram(
"ai_search_request_latency_ms",
"Latency of ai search request in ms",
buckets=(50, 100, 300, 500, 1000, 2000, 3000, 5000, 10000),
)
llm_inflight = Gauge(
"ai_search_llm_inflight",
"Current inflight requests of llm generation",
)
15.3 为什么要记录检索中间态
线上排查时最痛苦的不是报错,而是“答得不准但也没报错”。
所以建议保存:
- 原始 query
- 改写后的 query
- 各路召回结果
- 精排前后排序
- 最终上下文片段
- 生成答案
当然要注意脱敏和审计权限。
有了这些信息,后续做离线评测、效果回放、Prompt 调优会轻松很多。
十六、离线评测体系:没有评测,就无法持续优化
很多团队做 AI 搜索时,只看“感觉好不好”。这在早期可以,但系统想持续迭代,必须有评测集。
16.1 建议评测维度
- Answer Relevance:答案是否回答了问题
- Context Precision:召回内容是否真的相关
- Faithfulness:答案是否忠于上下文
- Citation Accuracy:引用是否正确
- Latency:延迟是否可接受
16.2 实践建议
建立一批高价值问答对:
- 新人 onboarding 常问问题
- 线上事故排障问题
- 业务制度和流程问题
- 高频客服问题
每次改 Prompt、改模型、改召回策略,都跑一轮离线评测。
这比“我感觉这次更聪明了”可靠得多。
十七、生产级部署方案
17.1 Docker Compose 版本
单机或小团队验证环境,推荐先用 Compose。
version: "3.9"
services:
api:
build: .
restart: always
ports:
- "8000:8000"
environment:
OLLAMA_BASE_URL: http://ollama:11434
REDIS_URL: redis://redis:6379/0
SEARXNG_BASE_URL: http://searxng:8080
depends_on:
- ollama
- redis
- searxng
ollama:
image: ollama/ollama:latest
restart: always
ports:
- "11434:11434"
volumes:
- ollama_data:/root/.ollama
environment:
OLLAMA_KEEP_ALIVE: "24h"
OLLAMA_NUM_PARALLEL: "4"
OLLAMA_MAX_LOADED_MODELS: "2"
redis:
image: redis:7-alpine
restart: always
command: redis-server --appendonly yes --maxmemory 2gb --maxmemory-policy allkeys-lru
ports:
- "6379:6379"
searxng:
image: searxng/searxng:latest
restart: always
ports:
- "8080:8080"
volumes:
ollama_data:
17.2 Kubernetes 版本需要关注什么
到了 K8s,重点不是“能跑起来”,而是这些问题:
- GPU 节点调度
- 模型服务与 API 服务分离扩容
- HPA 指标选择
- 冷启动和模型预热
- Pod 优雅终止,避免流式连接中断
- ConfigMap / Secret 管理
推荐拆分为:
api-service
retrieval-service
rerank-service
llm-service
ingestion-job
这样可以按资源特征独立伸缩。
十八、两个真实业务场景,看看架构如何落地
18.1 场景一:企业内部知识问答
问题示例:
“支付服务出现下游超时,标准排查路径是什么?”
系统执行路径:
- Query Rewrite 把“标准排查路径”改写为“runbook / troubleshooting”
- 内部知识库 BM25 命中《支付超时故障排查手册》
- 向量检索召回《网关重试风暴事故复盘》
- Rerank 发现 runbook 文档优先级最高
- Context Builder 选取 SOP 中的关键步骤和阈值
- LLM 输出结论并引用内部文档片段
这个场景里:
- 公网搜索不是主角
- 内部知识库质量决定最终效果
- 权限过滤必须生效
18.2 场景二:本地技术研究助手
问题示例:
“为什么 Java 应用在 Kubernetes 里内存看起来比 Xmx 大很多?”
系统执行路径:
- 外部搜索召回 JVM、容器内存、page cache 等公开资料
- 内部知识库召回本团队的 K8s JVM 调优手册
- 混合重排后,将“原理解释 + 团队标准实践”一起送入上下文
- 最终答案既解释原理,也给出组织内推荐排查路径
这个场景的价值在于:
- 外部资料提供广度
- 内部资料提供落地性
- AI 输出把两者结合起来
这正是“本地 AI 搜索引擎”比“纯聊天机器人”更有价值的地方。
十九、常见架构误区
误区 1:把大模型当搜索引擎
模型不具备稳定、可控、可追踪的检索能力。
搜索一定要由检索系统完成,模型负责理解和组织答案。
误区 2:只做向量检索,不做关键词检索
很多企业文档里有:
这些内容向量检索不一定强,关键词检索必须保留。
误区 3:只看回答流畅度,不看忠实度
最危险的答案,往往不是“答不上来”,而是“答得很像对的”。
误区 4:把 Prompt 当成唯一优化手段
在 AI 搜索里,通常优先级更高的是:
- 改善召回
- 做好重排
- 控制上下文
- 最后才是微调 Prompt
误区 5:把所有逻辑塞进一个 Python 文件
前期快,后期一定难维护。
检索、上下文、生成、缓存、观测最好从一开始就解耦。
二十、性能优化路线图
如果你准备把这套系统从 PoC 推向线上,建议按下面顺序优化。
第一阶段:先保证正确性
- 加入引用
- 限制幻觉
- 建立离线评测集
- 打通内部知识库入库链路
第二阶段:再优化延迟
- 并行多路召回
- 模型常驻显存
- 抽取结果缓存
- 热点问题答案缓存
- 精简上下文
第三阶段:再提升吞吐
- 拆分检索服务和生成服务
- 对 LLM 层做并发门控
- 做租户限流
- 扩展多实例和负载均衡
第四阶段:最后优化演进能力
- 多模型灰度
- 多索引版本切换
- 自动评测
- A/B Test
这条路线比“一上来就上最复杂架构”更现实。
二十一、最小可运行示例的依赖清单
fastapi>=0.115.0
uvicorn[standard]>=0.30.0
httpx>=0.27.0
pydantic>=2.8.0
pydantic-settings>=2.3.0
redis[hiredis]>=5.0.0
tiktoken>=0.7.0
trafilatura>=1.12.0
beautifulsoup4>=4.12.0
prometheus-client>=0.20.0
如果你计划继续往生产推进,建议再引入:
- PostgreSQL:审计与回放
- OpenTelemetry:trace
- Milvus / OpenSearch / pgvector:更稳的检索底座
- Kafka:异步入库和索引更新
二十二、一份更完整的回答服务实现
为了让整条链路闭环,这里补一版 AnswerService。
# app/application/answer_service.py
from __future__ import annotations
from collections.abc import AsyncIterator
class AnswerService:
def __init__(self, ollama_client):
self.ollama_client = ollama_client
async def answer_stream(self, prompt_package) -> AsyncIterator[str]:
async for token in self.ollama_client.chat_stream(
system_prompt=prompt_package.system_prompt,
user_prompt=prompt_package.user_prompt,
):
yield token
它看起来简单,但这是好事。
因为复杂度已经被前面的分层吸收掉了。
如果后续要加:
- 安全审查
- 敏感词检测
- 输出后处理
- Markdown 转富文本
都可以在这个服务里扩展,而不污染其它层。
二十三、如果你要继续做生产化,下一步应该补什么
这篇文章已经给出了一套能支撑中小规模生产落地的主干方案。
如果继续向企业级演进,建议优先补以下能力:
1. 文档权限过滤
让召回阶段就遵守 ACL,而不是回答后再处理。
2. 索引版本管理
支持双索引切换、灰度回滚、增量重建。
3. 对话记忆摘要
多轮问答时,不要把整个历史硬拼进上下文,要做摘要压缩。
4. 结构化答案输出
让模型输出 JSON Schema,方便前端渲染卡片、来源、置信度。
5. RAG 评测平台
把评测变成日常工程动作,而不是临时脚本。
二十四、全文总结
从架构视角看,本地 AI 搜索引擎的关键,不是“把 Ollama 跑起来”,而是把下面四件事真正做好:
- 检索做对:混合召回、查询改写、精排去重、权限过滤
- 生成做稳:上下文预算、引用约束、流式输出、降级策略
- 工程做实:缓存、限流、熔断、观测、评测、回放
- 架构做活:模型解耦、索引解耦、服务拆分、可灰度演进
如果只做 Demo,你会得到一个“偶尔惊艳”的系统。
如果按本文的思路做工程化升级,你更有机会得到一个“长期可维护、可持续优化、能真正服务业务”的生产级 AI 搜索平台。
附:一套推荐的技术选型组合
如果你希望快速落地,同时保留后续扩展空间,我会推荐下面这套组合:
| 能力域 |
推荐选型 |
原因 |
| API 服务 |
FastAPI |
异步友好,适合 SSE |
| 本地推理 |
Ollama |
本地部署简单,生态成熟 |
| Embedding |
BGE-M3 / bge-small-zh |
中文效果稳定 |
| Rerank |
BGE Reranker / BCE Reranker |
精排性价比高 |
| 公网搜索 |
SearXNG |
自托管、可控 |
| 向量存储 |
pgvector / Milvus |
从轻量到规模化都可覆盖 |
| 关键词检索 |
PostgreSQL FTS / OpenSearch |
对错误码、术语、类名更友好 |
| 缓存 |
Redis |
TTL、分布式、生态成熟 |
| 观测 |
Prometheus + Grafana + OTel |
生产必备 |
| 编排 |
Python AsyncIO |
开发效率和 I/O 并发都不错 |
如果是我来落地这类系统,通常会分三步走:
- 第一步:FastAPI + Ollama + SearXNG + Redis,先跑通主链路
- 第二步:补向量检索、Rerank、离线入库和评测
- 第三步:拆服务、上 K8s、做权限和多租户
这个演进路径足够务实,也足够贴近真实项目。