找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3788

积分

0

好友

502

主题
发表于 1 小时前 | 查看: 3| 回复: 0

如果说 ChatGPT 更像“对话式生成器”,Perplexity 更像“带证据链的答案引擎”,那么企业真正需要的,往往是第三种系统:既能接入公网搜索,也能接入内部知识库;既能流式回答,也能给出可追溯来源;既要低成本本地化部署,也要扛住高并发、可观测、可扩展。

本文不再停留在“搜索 + LLM = AI 搜索”这一层,而是从架构师视角,完整拆解一套可落地、可演进、可生产部署的本地 AI 搜索引擎方案。在云栈社区,我们一直探讨如何将前沿的AI技术真正工程化落地,而这套方案正是对“LLM应用”与“后端 & 架构”深度结合的一次完整实践。


一、为什么企业真正需要的是“本地 AI 搜索引擎”

很多团队一开始会把 AI 搜索想简单了:

  • 给大模型一个问题
  • 调用搜索引擎拿几个网页
  • 把网页正文拼进 Prompt
  • 让模型输出答案

这个 Demo 在技术分享里没问题,但一旦放到真实业务,就会迅速暴露四类系统性问题。

1.1 数据合规问题

企业知识往往来自:

  • 内部 Wiki
  • 研发文档
  • 运维 SOP
  • 代码仓库
  • 事故复盘
  • 客服知识库
  • 合同、制度、流程文档

这些内容通常不能直接上传到公网模型。尤其在医疗、金融、政企、制造场景,数据出域本身就不被允许。

1.2 搜索质量问题

单纯“搜网页 + 拼上下文”会遇到:

  • 召回不全:只靠单路搜索,容易漏掉关键资料
  • 相关性不稳:标题匹配但正文无效
  • 冗余严重:多个页面内容重复,浪费上下文窗口
  • 幻觉风险高:模型把“检索片段”脑补成“结论”

1.3 延迟与吞吐问题

一个能真正上线的 AI 搜索系统,必须回答以下问题:

  • 首 Token 延迟能否控制在 2s 以内
  • 单机或单集群能否承载百级到千级并发
  • 检索、抽取、重排、生成这些阶段是否并行
  • 热点问题是否有缓存
  • 模型冷启动和上下文过长是否被治理

1.4 工程治理问题

当系统开始服务真实用户时,架构重点就从“能不能回答”转向:

  • 可观测:哪一步慢,哪一步错
  • 可回放:为什么这个答案这样生成
  • 可灰度:新模型、新检索策略如何上线
  • 可隔离:搜索服务抖动时,生成链路能否降级
  • 可扩展:后续如何接入内部知识库、权限体系、多租户

所以,从企业视角看,本地 AI 搜索引擎的本质不是一个 Prompt 工程 Demo,而是一个“检索增强型答案系统”。


二、先给结论:一套生产级 AI 搜索引擎应该长什么样

2.1 核心能力清单

一套可上线的本地 AI 搜索引擎,至少要包含下面这些能力:

  • 混合召回:公网搜索 + 内部知识库 + 向量检索 + 关键词检索
  • 查询重写:把自然语言问题改写成更适合搜索的形式
  • 多阶段排序:粗排、精排、去重、时效性加权
  • 上下文治理:Chunk 切分、Token 预算、窗口裁剪、引用编号
  • 流式回答:SSE/WebSocket 输出,降低用户感知等待
  • 结果缓存:搜索结果缓存、抽取正文缓存、答案缓存
  • 并发治理:限流、连接池、熔断、隔离舱、降级
  • 可观测性:trace、metrics、日志、回放
  • 多模型演进:推理模型、Embedding 模型、Rerank 模型解耦

2.2 推荐总体架构

                          ┌─────────────────────┐
                          │     Web / APP        │
                          └──────────┬───────────┘
                                     │
                          ┌──────────▼───────────┐
                          │ API Gateway / BFF    │
                          │ Auth / Quota / CORS  │
                          └──────────┬───────────┘
                                     │
               ┌─────────────────────▼─────────────────┐
               │       Search Orchestrator              │
               │  Query Rewrite / Fanout / Merge        │
               └───────┬──────────┬──────────┬─────────┘
                       │          │          │
           ┌───────────▼───┐ ┌────▼────┐ ┌──▼──────────┐
           │ Web Search    │ │ Vector  │ │ BM25 / FTS   │
           │ SearXNG       │ │ Store   │ │ OpenSearch   │
           └───────────┬───┘ └────┬────┘ └──┬──────────┘
                       │          │          │
                       └──────┬───┴──────┬───┘
                              │          │
                  ┌───────────▼──────────▼───────┐
                  │   Recall Merge + Rerank      │
                  │ Dedup / Freshness / Score    │
                  └───────────┬──────────────────┘
                              │
                  ┌───────────▼──────────┐
                  │ Context Builder      │
                  │ Chunk / Budget       │
                  └───────────┬──────────┘
                              │
              ┌───────────────▼──────────────┐
              │     LLM Answer Service       │
              │ Ollama / vLLM / llama.cpp    │
              └───────────────┬──────────────┘
                              │
                  ┌───────────▼──────────┐
                  │ SSE / Streaming      │
                  └──────────────────────┘

旁路支撑能力:
- Redis:缓存、幂等、热点保护
- PostgreSQL:审计、会话、检索记录、离线评测
- Prometheus + Grafana:观测
- Kafka / RabbitMQ:异步入库、索引更新

2.3 一句话理解这套架构

它不是“让 LLM 去搜索”,而是“由检索编排层先构造证据,再让 LLM 基于证据生成答案”。

这个差异决定了系统的可控性和可生产性。


三、核心原理:为什么 AI 搜索不是简单的 RAG

3.1 RAG 的基本流程

RAG 的经典链路是:

  1. 用户提问
  2. 从知识库召回相关内容
  3. 将内容拼入 Prompt
  4. 大模型基于检索结果生成答案

这个流程没错,但在生产环境中,真正决定效果的不是“有没有检索”,而是以下四个工程细节。

3.2 Query Rewrite:用户问题通常不适合直接检索

用户会这样问:

  • “我们上次线上支付超时怎么排查的?”
  • “为什么 Java 服务在 k8s 里 RSS 很高?”
  • “本地知识库和公网搜索结果冲突时应该信谁?”

这些问题直接拿去做搜索,经常命中不准。原因很简单:自然语言问题不等于高质量检索语句。

生产级做法通常是把用户问题拆成三个视角:

  • 原始问题:保留用户真实意图
  • 搜索重写:提炼关键词、限定领域、补齐专业术语
  • 多跳问题:复杂问题拆成多个子问题并行召回

例如:

原问题:
为什么 Java 服务在 k8s 里 RSS 很高?

重写后:
Kubernetes Java RSS memory high container memory usage heap off-heap page cache

子问题:
1. Java 进程 RSS 由哪些部分组成
2. 容器内 RSS 和 JVM Heap 的关系
3. k8s 场景下如何排查 off-heap / mmap / page cache

这一步会直接提升召回质量。

3.3 Recall:单路召回在企业场景一定不够

生产系统通常不只一条召回链路,而是多路并行:

  • Web Search:适合查最新信息、公开资料、社区经验
  • BM25/全文检索:适合精准关键词命中
  • Vector Search:适合语义相似问题和口语化表达
  • Metadata Filter:适合按部门、项目、时间、文档类型筛选

为什么要混合召回?

  • 关键词检索的精确性高,但语义泛化弱
  • 向量检索的召回广,但容易“像但不对”
  • 公网搜索时效性强,但可信度不稳定
  • 内部知识库可信度高,但覆盖范围有限

所以最常见的正确姿势是:

关键词召回保精度 + 向量召回保覆盖 + 公网召回补时效

3.4 Rerank:召回出来不代表能直接用

很多 Demo 在召回后直接把 TopK 拼进 Prompt。问题在于:

  • TopK 里可能有重复页面
  • 片段可能和问题只“部分相关”
  • 多个来源之间可能互相冲突
  • 一些片段虽然相关,但信息密度极低

因此需要至少两级排序:

  • 粗排:关键词得分、来源权重、时间衰减、点击先验
  • 精排:Cross-Encoder Rerank,对“问题-文档片段”做成对打分

Cross-Encoder 的本质是:不是单独编码 query 和 document 再算余弦,而是把两者一起输入模型,让模型直接判断“这段内容到底能不能回答这个问题”。

在复杂问题里,这一步的提升通常比“换更大的生成模型”更明显。

3.5 Grounded Generation:答案必须被证据约束

生产级 AI 搜索和普通聊天机器人最大的区别,不是答案更长,而是答案更“可证伪”。

推荐的生成约束包括:

  • 仅依据提供的参考资料回答
  • 资料不足时明确说明不确定
  • 结论后标注引用编号
  • 输出“答案 + 证据来源 + 不确定性声明”

也就是说,我们需要的是:

answerablefluent 更重要
faithfulcreative 更重要


四、生产级架构设计:从 Demo 到可上线系统

4.1 分层架构

推荐把系统拆成 6 层。

接入层

职责:

  • 鉴权
  • 限流
  • 请求校验
  • SSE / WebSocket 流式输出
  • 租户隔离

可选技术:

  • Nginx / Kong / APISIX
  • FastAPI / Starlette

编排层

职责:

  • 查询重写
  • 多路召回 fanout
  • 召回合并
  • 结果去重
  • 上下文构建
  • 答案生成工作流编排

这一层是整个系统的大脑。

检索层

职责:

  • 外部搜索引擎访问
  • 内部知识库召回
  • 向量检索
  • 关键词检索
  • 元数据过滤

模型层

职责:

  • Embedding
  • Rerank
  • LLM 推理

建议分离部署,而不是混在一个 Python 进程里。

数据层

职责:

  • 文档原文存储
  • 向量索引
  • 倒排索引
  • 缓存
  • 会话和审计

治理层

职责:

  • 指标采集
  • 链路追踪
  • 审计回放
  • 灰度发布
  • 离线评测

4.2 在线链路时序图

(图略,建议补充系统交互时序图)

4.3 离线入库链路

在线搜索只是一半,企业真正可持续的价值来自“知识资产化”。

离线链路通常包括:

  1. 文档采集:Wiki、PDF、Markdown、代码仓库、数据库、SOP
  2. 内容解析:OCR、正文提取、表格抽取、代码块保留
  3. 文本标准化:去噪、清洗、统一编码
  4. Chunk 切分:按标题、语义、长度切块
  5. 元数据注入:租户、部门、项目、版本、时间、权限
  6. Embedding:生成向量
  7. 索引写入:向量库 + 倒排索引
  8. 版本管理:支持回滚和双索引切换

线上效果稳定的关键,往往不在 LLM,而在这条离线链路做得是否扎实。


五、工程化升级重点:高并发、可扩展、可治理

5.1 并发模型:不是所有步骤都应该串行

一次查询大致包含这些阶段:

  • 请求校验
  • 查询改写
  • 多路召回
  • 正文抽取
  • 重排
  • 上下文构建
  • LLM 推理

其中最适合并行化的是:

  • 多路召回
  • 多页面正文抽取
  • 多子问题 fanout

其中最需要限流的是:

  • LLM 推理
  • Rerank 模型
  • 外部搜索引擎访问

推荐的原则:

  • I/O 密集阶段异步并发
  • GPU/CPU 重计算阶段显式信号量控制
  • 慢依赖通过超时、隔离舱和熔断保护

5.2 缓存设计:热点问题是天然红利

AI 搜索的缓存不是单一缓存,而是多层缓存。

L1:进程内缓存

适合缓存:

  • 热门 query 的改写结果
  • 热门 query 的召回结果
  • 常见文档正文抽取结果

特点:

  • 命中快
  • 适合短 TTL
  • 不能跨实例共享

L2:Redis 分布式缓存

适合缓存:

  • 搜索结果
  • 片段抽取结果
  • 生成答案
  • 会话上下文摘要

缓存 Key 设计建议

不要只用 query 当 key,建议带上:

  • query hash
  • tenant id
  • model id
  • retrieval profile
  • topk
  • prompt version

否则一旦 Prompt 或召回策略升级,缓存会污染新结果。

5.3 限流与熔断

生产环境中,最常见的两个崩法是:

  • 突发并发把 Ollama 打满,排队雪崩
  • 外部搜索引擎超时,把整个请求链路拖死

推荐治理策略:

  • API 层限流:按租户、按用户、按 IP 配额
  • LLM 层并发信号量:限制同时生成数
  • 外部搜索熔断:连续失败后短时间走降级
  • 摘要/精排降级:超时时直接跳过,不阻塞主链路

5.4 多模型解耦

千万不要把“一个模型解决所有问题”当成默认设计。

推荐解耦成三类模型:

  • Embedding 模型:负责语义召回
  • Rerank 模型:负责问题-片段精排
  • Generation 模型:负责最终答案生成

这样做的好处是:

  • 可分别扩容
  • 可分别灰度
  • 可根据成本做替换
  • 可以针对中文场景和英文场景独立优化

5.5 多租户与权限控制

企业知识库接入后,权限是必须做的,而不是“以后再说”。

最少要支持:

  • 文档级权限
  • 部门级权限
  • 租户隔离
  • 来源可见性控制

检索时的权限过滤必须前置在召回阶段,而不是等答案生成后再做脱敏。
因为一旦召回阶段就把不该看的内容送进模型,上下文已经泄漏了。


六、项目结构设计:一份更接近生产环境的代码组织

local-ai-search/
├── app/
│   ├── main.py
│   ├── config.py
│   ├── bootstrap.py
│   ├── api/
│   │   ├── deps.py
│   │   ├── routes_health.py
│   │   └── routes_search.py
│   ├── domain/
│   │   ├── models.py
│   │   ├── enums.py
│   │   └── contracts.py
│   ├── application/
│   │   ├── query_rewrite_service.py
│   │   ├── retrieval_service.py
│   │   ├── answer_service.py
│   │   ├── context_service.py
│   │   └── orchestrator.py
│   ├── infrastructure/
│   │   ├── cache/
│   │   │   ├── memory_cache.py
│   │   │   └── redis_cache.py
│   │   ├── llm/
│   │   │   └── ollama_client.py
│   │   ├── retrieval/
│   │   │   ├── searxng_client.py
│   │   │   ├── vector_store.py
│   │   │   ├── keyword_store.py
│   │   │   └── rerank_client.py
│   │   ├── parser/
│   │   │   └── content_extractor.py
│   │   └── observability/
│   │       ├── metrics.py
│   │       └── tracing.py
│   └── shared/
│       ├── retry.py
│       ├── hashing.py
│       ├── token_budget.py
│       └── jsonlog.py
├── scripts/
│   ├── ingest_docs.py
│   └── evaluate_rag.py
├── tests/
│   ├── unit/
│   ├── integration/
│   └── e2e/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── README.md

这样分层的意义是:

  • domain 只放业务对象和接口抽象
  • application 放编排逻辑
  • infrastructure 放具体实现

后续如果从 Ollama 切到 vLLM、从 Chroma 切到 Milvus,改动会局部很多。


七、核心数据模型设计

7.1 Pydantic 模型

# app/domain/models.py
from __future__ import annotations

from enum import Enum
from typing import Any
from pydantic import BaseModel, Field, HttpUrl

class RetrievalSource(str, Enum):
    WEB = "web"
    INTERNAL = "internal"
    VECTOR = "vector"
    KEYWORD = "keyword"

class SearchRequest(BaseModel):
    query: str = Field(..., min_length=1, max_length=1000)
    tenant_id: str = Field(..., min_length=1, max_length=64)
    user_id: str = Field(..., min_length=1, max_length=64)
    session_id: str | None = Field(default=None, max_length=128)
    stream: bool = True
    top_k: int = Field(default=8, ge=1, le=20)
    enable_web_search: bool = True
    enable_internal_search: bool = True
    retrieval_profile: str = Field(default="balanced", pattern="^(balanced|fast|deep)$")

class RetrievedChunk(BaseModel):
    chunk_id: str
    doc_id: str
    title: str
    content: str
    url: HttpUrl | None = None
    source: RetrievalSource
    score: float = 0.0
    metadata: dict[str, Any] = Field(default_factory=dict)

class Citation(BaseModel):
    index: int
    title: str
    url: HttpUrl | None = None
    source: RetrievalSource

class SearchAnswer(BaseModel):
    answer: str
    citations: list[Citation] = Field(default_factory=list)
    latency_ms: int = 0
    trace_id: str
    degraded: bool = False
    debug: dict[str, Any] = Field(default_factory=dict)

7.2 为什么要显式建模 RetrievedChunk

因为生产环境的排序、过滤、审计都要围绕 chunk,而不是原始 document。

原因包括:

  • 一个长文档只能局部相关
  • chunk 便于引用编号
  • chunk 粒度更适合重排
  • chunk 更适合构建 token 预算

八、配置设计:让系统能跑在开发、测试、生产三个环境

# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )

    app_name: str = "local-ai-search"
    env: str = "dev"
    host: str = "0.0.0.0"
    port: int = 8000

    ollama_base_url: str = "http://ollama:11434"
    ollama_chat_model: str = "qwen2.5:7b-instruct-q4_K_M"
    ollama_embed_model: str = "bge-m3:latest"
    ollama_timeout_seconds: int = 120
    llm_max_concurrency: int = 4

    searxng_base_url: str = "http://searxng:8080"
    web_search_timeout_ms: int = 2500

    redis_url: str = "redis://redis:6379/0"
    postgres_dsn: str = "postgresql+psycopg://postgres:postgres@postgres:5432/aisearch"

    max_context_tokens: int = 6000
    max_chunk_tokens: int = 500
    chunk_overlap_tokens: int = 80

    retrieval_timeout_ms: int = 3000
    rerank_timeout_ms: int = 1500
    cache_ttl_search_seconds: int = 300
    cache_ttl_answer_seconds: int = 180

    trace_enabled: bool = True
    metrics_enabled: bool = True
    log_level: str = "INFO"

settings = Settings()

这里的设计重点不是字段多,而是:

  • 超时独立可配
  • 模型独立可配
  • 上下文预算独立可配
  • 缓存 TTL 独立可配

后期调优会非常频繁,配置项必须充分外置。


九、检索链路设计:生产系统的真正核心

9.1 查询重写服务

# app/application/query_rewrite_service.py
from __future__ import annotations

import re
from dataclasses import dataclass

@dataclass(slots=True)
class RewriteResult:
    original_query: str
    search_queries: list[str]
    intent: str

class QueryRewriteService:
    """
    轻量版查询改写器。
    生产环境可替换为小模型改写、规则引擎或领域词典增强。
    """

    def rewrite(self, query: str) -> RewriteResult:
        normalized = re.sub(r"\s+", " ", query.strip())
        lowered = normalized.lower()

        search_queries = [normalized]

        if "k8s" in lowered:
            search_queries.append(normalized.replace("k8s", "kubernetes"))
        if "rss" in lowered:
            search_queries.append(f"{normalized} off-heap mmap page cache")
        if "排查" in normalized:
            search_queries.append(f"{normalized} runbook troubleshooting")

        # 去重保序
        deduped = list(dict.fromkeys(search_queries))
        intent = "diagnosis" if "排查" in normalized or "为什么" in normalized else "qa"
        return RewriteResult(
            original_query=normalized,
            search_queries=deduped,
            intent=intent,
        )

这个实现不复杂,但已经体现了一条重要原则:

查询重写服务本质上是一个可演进策略点

你可以先用规则,再换小模型,再叠加领域词典。

9.2 召回聚合服务

# app/application/retrieval_service.py
from __future__ import annotations

import asyncio
from typing import Iterable

from app.domain.models import RetrievedChunk, RetrievalSource

class RetrievalService:
    def __init__(self, web_client, vector_store, keyword_store, rerank_client):
        self.web_client = web_client
        self.vector_store = vector_store
        self.keyword_store = keyword_store
        self.rerank_client = rerank_client

    async def retrieve(
        self,
        query: str,
        search_queries: list[str],
        top_k: int,
        enable_web_search: bool,
        enable_internal_search: bool,
    ) -> list[RetrievedChunk]:
        tasks = []

        if enable_web_search:
            tasks.append(self._safe_web_search(search_queries))
        if enable_internal_search:
            tasks.append(self._safe_vector_search(query, top_k * 3))
            tasks.append(self._safe_keyword_search(query, top_k * 3))

        recall_results = await asyncio.gather(*tasks, return_exceptions=True)

        merged = []
        for result in recall_results:
            if isinstance(result, Exception):
                continue
            merged.extend(result)

        deduped = self._deduplicate(merged)
        reranked = await self.rerank_client.rerank(query, deduped, top_k=top_k)
        return reranked

    async def _safe_web_search(self, queries: list[str]) -> list[RetrievedChunk]:
        results = await self.web_client.multi_search(queries)
        return [
            RetrievedChunk(
                chunk_id=item["id"],
                doc_id=item["doc_id"],
                title=item["title"],
                content=item["content"],
                url=item.get("url"),
                source=RetrievalSource.WEB,
                score=float(item.get("score", 0.0)),
                metadata=item.get("metadata", {}),
            )
            for item in results
        ]

    async def _safe_vector_search(self, query: str, limit: int) -> list[RetrievedChunk]:
        return await self.vector_store.search(query, limit=limit)

    async def _safe_keyword_search(self, query: str, limit: int) -> list[RetrievedChunk]:
        return await self.keyword_store.search(query, limit=limit)

    def _deduplicate(self, chunks: Iterable[RetrievedChunk]) -> list[RetrievedChunk]:
        seen = set()
        output: list[RetrievedChunk] = []
        for chunk in chunks:
            dedup_key = (chunk.doc_id, chunk.content[:120])
            if dedup_key in seen:
                continue
            seen.add(dedup_key)
            output.append(chunk)
        return output

这里的关键点有三个:

  • 多路召回并行执行
  • 单路失败不拖垮整体
  • 合并后再统一精排

9.3 Rerank 服务

# app/infrastructure/retrieval/rerank_client.py
from __future__ import annotations

from app.domain.models import RetrievedChunk

class RerankClient:
    """
    示例实现:先融合规则分,再预留 cross-encoder 精排接口。
    生产环境推荐接 BGE Reranker / jina-reranker / bce-reranker。
    """

    async def rerank(self, query: str, chunks: list[RetrievedChunk], top_k: int) -> list[RetrievedChunk]:
        query_terms = set(query.lower().split())

        for chunk in chunks:
            title_text = f"{chunk.title} {chunk.content[:500]}".lower()
            lexical_score = sum(1 for term in query_terms if term in title_text) / max(len(query_terms), 1)

            freshness_boost = 0.0
            if chunk.metadata.get("updated_at"):
                freshness_boost = 0.05

            source_boost = 0.08 if chunk.source.value == "internal" else 0.03

            chunk.score = chunk.score * 0.5 + lexical_score * 0.35 + freshness_boost + source_boost

        chunks.sort(key=lambda item: item.score, reverse=True)
        return chunks[:top_k]

真实线上系统里,一般会采用:

  • 召回阶段 Top50 或 Top100
  • 精排后截断到 Top5 或 Top8

这个比例比较常见,因为:

  • 召回太少,容易漏信息
  • 上下文太多,会让模型分心、延迟上升、成本变高

十、上下文构建:决定生成质量的最后一道关

很多时候,模型答偏并不是模型不行,而是上下文构建做得差。

10.1 上下文构建原则

  • 不拼冗余内容
  • 不拼低信息密度内容
  • 不拼冲突且未标记来源的内容
  • 给每段上下文编号,方便引用
  • 控制总 Token,预留回答空间

10.2 生产级 Context Builder

# app/application/context_service.py
from __future__ import annotations

from dataclasses import dataclass
import tiktoken

from app.config import settings
from app.domain.models import RetrievedChunk

@dataclass(slots=True)
class PromptPackage:
    system_prompt: str
    user_prompt: str
    citations: list[dict]

class ContextService:
    def __init__(self) -> None:
        self.encoding = tiktoken.get_encoding("cl100k_base")

    def build_prompt(self, query: str, chunks: list[RetrievedChunk]) -> PromptPackage:
        system_prompt = (
            "你是企业级 AI 搜索助手。"
            "请严格基于参考资料回答,不要编造。"
            "如果资料不足,请明确指出不确定。"
            "回答结论后标注引用编号,例如[1][2]。"
            "最后附上“参考来源”列表。"
        )

        token_budget = settings.max_context_tokens
        consumed = 0
        context_blocks = []
        citations = []

        for idx, chunk in enumerate(chunks, start=1):
            block = f"[{idx}] 标题:{chunk.title}\n来源:{chunk.url or chunk.doc_id}\n内容:{chunk.content.strip()}"
            block_tokens = len(self.encoding.encode(block))

            if consumed + block_tokens > token_budget:
                break

            consumed += block_tokens
            context_blocks.append(block)
            citations.append(
                {
                    "index": idx,
                    "title": chunk.title,
                    "url": str(chunk.url) if chunk.url else None,
                    "source": chunk.source.value,
                }
            )

        user_prompt = (
            f"用户问题:\n{query}\n\n"
            f"参考资料:\n{'\n\n'.join(context_blocks)}\n\n"
            "请输出:\n"
            "1. 先给结论\n"
            "2. 再给关键依据\n"
            "3. 如果存在不确定性,单独说明\n"
            "4. 文末列出参考来源"
        )

        return PromptPackage(
            system_prompt=system_prompt,
            user_prompt=user_prompt,
            citations=citations,
        )

10.3 为什么要预留回答空间

如果把上下文塞满整个窗口,会有三个副作用:

  • 模型几乎没有余量生成答案
  • 推理延迟显著上升
  • 重要信息被边缘化

一般经验:

  • 60% 给参考资料
  • 10% 给 system prompt 和格式约束
  • 30% 留给最终回答

这是比“能塞多长塞多长”更稳定的策略。


十一、LLM 推理服务:流式输出、并发控制、超时治理

11.1 Ollama 客户端封装

# app/infrastructure/llm/ollama_client.py
from __future__ import annotations

import asyncio
import json
from collections.abc import AsyncIterator

import httpx

from app.config import settings

class OllamaClient:
    def __init__(self) -> None:
        self._client = httpx.AsyncClient(
            timeout=httpx.Timeout(settings.ollama_timeout_seconds, connect=5.0),
            limits=httpx.Limits(max_connections=50, max_keepalive_connections=20),
        )
        self._semaphore = asyncio.Semaphore(settings.llm_max_concurrency)

    async def chat_stream(self, system_prompt: str, user_prompt: str) -> AsyncIterator[str]:
        payload = {
            "model": settings.ollama_chat_model,
            "stream": True,
            "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt},
            ],
            "options": {
                "temperature": 0.2,
                "top_p": 0.9,
                "num_predict": 1024,
            },
        }

        async with self._semaphore:
            async with self._client.stream("POST", f"{settings.ollama_base_url}/api/chat", json=payload) as resp:
                resp.raise_for_status()
                async for line in resp.aiter_lines():
                    if not line.strip():
                        continue
                    data = json.loads(line)
                    if data.get("done"):
                        break
                    message = data.get("message", {})
                    content = message.get("content")
                    if content:
                        yield content

    async def close(self) -> None:
        await self._client.aclose()

11.2 这里最容易犯的几个错

错误 1:用“请求数”当并发控制

LLM 的瓶颈不是 HTTP 请求数,而是模型推理资源。

正确做法是:

  • HTTP 层可以高并发
  • 生成层必须显式限流

错误 2:没有连接池和 keepalive

如果每次都新建连接,TTFT 会被网络和握手拖慢。

错误 3:把搜索和推理绑定死

外部搜索偶发失败时,不应该整个系统不可用。
应该允许“无外部搜索降级回答”或者“仅内部知识库回答”。


十二、编排器:把一切串起来的关键服务

# app/application/orchestrator.py
from __future__ import annotations

import time
from collections.abc import AsyncIterator

from app.domain.models import SearchRequest

class SearchOrchestrator:
    def __init__(self, rewrite_service, retrieval_service, context_service, answer_service):
        self.rewrite_service = rewrite_service
        self.retrieval_service = retrieval_service
        self.context_service = context_service
        self.answer_service = answer_service

    async def stream_answer(self, request: SearchRequest) -> AsyncIterator[str]:
        started_at = time.perf_counter()
        degraded = False

        rewritten = self.rewrite_service.rewrite(request.query)

        chunks = await self.retrieval_service.retrieve(
            query=rewritten.original_query,
            search_queries=rewritten.search_queries,
            top_k=request.top_k,
            enable_web_search=request.enable_web_search,
            enable_internal_search=request.enable_internal_search,
        )

        if not chunks:
            degraded = True

        prompt = self.context_service.build_prompt(request.query, chunks)

        async for token in self.answer_service.answer_stream(prompt):
            yield f"data: {token}\n\n"

        latency_ms = int((time.perf_counter() - started_at) * 1000)
        tail_payload = {
            "type": "final",
            "latency_ms": latency_ms,
            "degraded": degraded,
            "citations": prompt.citations,
        }
        yield f"data: {tail_payload}\n\n"
        yield "data: [DONE]\n\n"

这个 orchestrator 的设计目标很明确:

  • 上层 API 不关心底层实现
  • 底层组件都可以独立替换
  • 整个请求链路可以打通 trace_id 和 metrics

十三、API 层实现:支持 SSE 流式输出

# app/api/routes_search.py
from __future__ import annotations

from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse

from app.api.deps import get_orchestrator
from app.domain.models import SearchRequest

router = APIRouter(prefix="/api/v1", tags=["search"])

@router.post("/search")
async def search(request: SearchRequest, orchestrator=Depends(get_orchestrator)):
    stream = orchestrator.stream_answer(request)
    return StreamingResponse(
        stream,
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )

为什么生产里更推荐 SSE?

  • 实现简单
  • 浏览器原生支持好
  • 对“服务端单向推流”场景足够
  • 比 WebSocket 更适合问答输出

只有当你需要:

  • 双向实时控制
  • 中途打断
  • 多路事件交互

才更建议上 WebSocket。


十四、生产级缓存设计

14.1 两级缓存实现思路

# app/infrastructure/cache/memory_cache.py
from __future__ import annotations

import time

class MemoryCache:
    def __init__(self, max_size: int = 1024):
        self.max_size = max_size
        self.store: dict[str, tuple[float, object]] = {}

    def get(self, key: str, ttl_seconds: int):
        item = self.store.get(key)
        if not item:
            return None
        saved_at, value = item
        if time.time() - saved_at > ttl_seconds:
            self.store.pop(key, None)
            return None
        return value

    def set(self, key: str, value: object):
        if len(self.store) >= self.max_size:
            oldest_key = min(self.store.keys(), key=lambda item: self.store[item][0])
            self.store.pop(oldest_key, None)
        self.store[key] = (time.time(), value)
# app/infrastructure/cache/redis_cache.py
from __future__ import annotations

import json
import redis.asyncio as redis

class RedisCache:
    def __init__(self, redis_url: str):
        self.client = redis.from_url(redis_url, encoding="utf-8", decode_responses=True)

    async def get_json(self, key: str):
        value = await self.client.get(key)
        return json.loads(value) if value else None

    async def set_json(self, key: str, value: dict | list, ttl_seconds: int):
        await self.client.setex(key, ttl_seconds, json.dumps(value, ensure_ascii=False))

14.2 什么值得缓存

最值得缓存的是:

  • 搜索结果
  • 网页正文抽取结果
  • 热点问题最终答案
  • Query Rewrite 结果

不建议长期缓存的是:

  • 强时效问题答案
  • 权限敏感上下文
  • 带用户个性化偏好的最终生成结果

14.3 缓存穿透与击穿怎么防

至少做三件事:

  • 空结果短 TTL 缓存
  • 热 key singleflight 合并
  • 随机 TTL 抖动避免雪崩

十五、可观测性建设:没有观测就没有生产系统

15.1 关键指标

建议采集以下指标。

用户体验指标

  • QPS
  • P50/P95/P99 总延迟
  • TTFT 首 Token 延迟
  • 每次回答平均 Token 数
  • 成功率

检索质量指标

  • 每次查询召回数
  • Rerank 前后命中分布
  • 最终引用来源数
  • 空召回率
  • 回答拒答率

系统资源指标

  • GPU 使用率
  • 显存使用率
  • CPU 和内存
  • Redis 命中率
  • 外部搜索超时率

15.2 Prometheus 指标示例

# app/infrastructure/observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge

request_total = Counter(
    "ai_search_request_total",
    "Total count of ai search requests",
    ["tenant_id", "status"],
)

request_latency = Histogram(
    "ai_search_request_latency_ms",
    "Latency of ai search request in ms",
    buckets=(50, 100, 300, 500, 1000, 2000, 3000, 5000, 10000),
)

llm_inflight = Gauge(
    "ai_search_llm_inflight",
    "Current inflight requests of llm generation",
)

15.3 为什么要记录检索中间态

线上排查时最痛苦的不是报错,而是“答得不准但也没报错”。

所以建议保存:

  • 原始 query
  • 改写后的 query
  • 各路召回结果
  • 精排前后排序
  • 最终上下文片段
  • 生成答案

当然要注意脱敏和审计权限。

有了这些信息,后续做离线评测、效果回放、Prompt 调优会轻松很多。


十六、离线评测体系:没有评测,就无法持续优化

很多团队做 AI 搜索时,只看“感觉好不好”。这在早期可以,但系统想持续迭代,必须有评测集。

16.1 建议评测维度

  • Answer Relevance:答案是否回答了问题
  • Context Precision:召回内容是否真的相关
  • Faithfulness:答案是否忠于上下文
  • Citation Accuracy:引用是否正确
  • Latency:延迟是否可接受

16.2 实践建议

建立一批高价值问答对:

  • 新人 onboarding 常问问题
  • 线上事故排障问题
  • 业务制度和流程问题
  • 高频客服问题

每次改 Prompt、改模型、改召回策略,都跑一轮离线评测。
这比“我感觉这次更聪明了”可靠得多。


十七、生产级部署方案

17.1 Docker Compose 版本

单机或小团队验证环境,推荐先用 Compose。

version: "3.9"

services:
  api:
    build: .
    restart: always
    ports:
      - "8000:8000"
    environment:
      OLLAMA_BASE_URL: http://ollama:11434
      REDIS_URL: redis://redis:6379/0
      SEARXNG_BASE_URL: http://searxng:8080
    depends_on:
      - ollama
      - redis
      - searxng

  ollama:
    image: ollama/ollama:latest
    restart: always
    ports:
      - "11434:11434"
    volumes:
      - ollama_data:/root/.ollama
    environment:
      OLLAMA_KEEP_ALIVE: "24h"
      OLLAMA_NUM_PARALLEL: "4"
      OLLAMA_MAX_LOADED_MODELS: "2"

  redis:
    image: redis:7-alpine
    restart: always
    command: redis-server --appendonly yes --maxmemory 2gb --maxmemory-policy allkeys-lru
    ports:
      - "6379:6379"

  searxng:
    image: searxng/searxng:latest
    restart: always
    ports:
      - "8080:8080"

volumes:
  ollama_data:

17.2 Kubernetes 版本需要关注什么

到了 K8s,重点不是“能跑起来”,而是这些问题:

  • GPU 节点调度
  • 模型服务与 API 服务分离扩容
  • HPA 指标选择
  • 冷启动和模型预热
  • Pod 优雅终止,避免流式连接中断
  • ConfigMap / Secret 管理

推荐拆分为:

  • api-service
  • retrieval-service
  • rerank-service
  • llm-service
  • ingestion-job

这样可以按资源特征独立伸缩。


十八、两个真实业务场景,看看架构如何落地

18.1 场景一:企业内部知识问答

问题示例:

“支付服务出现下游超时,标准排查路径是什么?”

系统执行路径:

  1. Query Rewrite 把“标准排查路径”改写为“runbook / troubleshooting”
  2. 内部知识库 BM25 命中《支付超时故障排查手册》
  3. 向量检索召回《网关重试风暴事故复盘》
  4. Rerank 发现 runbook 文档优先级最高
  5. Context Builder 选取 SOP 中的关键步骤和阈值
  6. LLM 输出结论并引用内部文档片段

这个场景里:

  • 公网搜索不是主角
  • 内部知识库质量决定最终效果
  • 权限过滤必须生效

18.2 场景二:本地技术研究助手

问题示例:

“为什么 Java 应用在 Kubernetes 里内存看起来比 Xmx 大很多?”

系统执行路径:

  1. 外部搜索召回 JVM、容器内存、page cache 等公开资料
  2. 内部知识库召回本团队的 K8s JVM 调优手册
  3. 混合重排后,将“原理解释 + 团队标准实践”一起送入上下文
  4. 最终答案既解释原理,也给出组织内推荐排查路径

这个场景的价值在于:

  • 外部资料提供广度
  • 内部资料提供落地性
  • AI 输出把两者结合起来

这正是“本地 AI 搜索引擎”比“纯聊天机器人”更有价值的地方。


十九、常见架构误区

误区 1:把大模型当搜索引擎

模型不具备稳定、可控、可追踪的检索能力。
搜索一定要由检索系统完成,模型负责理解和组织答案。

误区 2:只做向量检索,不做关键词检索

很多企业文档里有:

  • 订单号
  • 错误码
  • 配置项
  • 类名
  • 接口名

这些内容向量检索不一定强,关键词检索必须保留。

误区 3:只看回答流畅度,不看忠实度

最危险的答案,往往不是“答不上来”,而是“答得很像对的”。

误区 4:把 Prompt 当成唯一优化手段

在 AI 搜索里,通常优先级更高的是:

  1. 改善召回
  2. 做好重排
  3. 控制上下文
  4. 最后才是微调 Prompt

误区 5:把所有逻辑塞进一个 Python 文件

前期快,后期一定难维护。
检索、上下文、生成、缓存、观测最好从一开始就解耦。


二十、性能优化路线图

如果你准备把这套系统从 PoC 推向线上,建议按下面顺序优化。

第一阶段:先保证正确性

  • 加入引用
  • 限制幻觉
  • 建立离线评测集
  • 打通内部知识库入库链路

第二阶段:再优化延迟

  • 并行多路召回
  • 模型常驻显存
  • 抽取结果缓存
  • 热点问题答案缓存
  • 精简上下文

第三阶段:再提升吞吐

  • 拆分检索服务和生成服务
  • 对 LLM 层做并发门控
  • 做租户限流
  • 扩展多实例和负载均衡

第四阶段:最后优化演进能力

  • 多模型灰度
  • 多索引版本切换
  • 自动评测
  • A/B Test

这条路线比“一上来就上最复杂架构”更现实。


二十一、最小可运行示例的依赖清单

fastapi>=0.115.0
uvicorn[standard]>=0.30.0
httpx>=0.27.0
pydantic>=2.8.0
pydantic-settings>=2.3.0
redis[hiredis]>=5.0.0
tiktoken>=0.7.0
trafilatura>=1.12.0
beautifulsoup4>=4.12.0
prometheus-client>=0.20.0

如果你计划继续往生产推进,建议再引入:

  • PostgreSQL:审计与回放
  • OpenTelemetry:trace
  • Milvus / OpenSearch / pgvector:更稳的检索底座
  • Kafka:异步入库和索引更新

二十二、一份更完整的回答服务实现

为了让整条链路闭环,这里补一版 AnswerService

# app/application/answer_service.py
from __future__ import annotations

from collections.abc import AsyncIterator

class AnswerService:
    def __init__(self, ollama_client):
        self.ollama_client = ollama_client

    async def answer_stream(self, prompt_package) -> AsyncIterator[str]:
        async for token in self.ollama_client.chat_stream(
            system_prompt=prompt_package.system_prompt,
            user_prompt=prompt_package.user_prompt,
        ):
            yield token

它看起来简单,但这是好事。
因为复杂度已经被前面的分层吸收掉了。

如果后续要加:

  • 安全审查
  • 敏感词检测
  • 输出后处理
  • Markdown 转富文本

都可以在这个服务里扩展,而不污染其它层。


二十三、如果你要继续做生产化,下一步应该补什么

这篇文章已经给出了一套能支撑中小规模生产落地的主干方案。
如果继续向企业级演进,建议优先补以下能力:

1. 文档权限过滤

让召回阶段就遵守 ACL,而不是回答后再处理。

2. 索引版本管理

支持双索引切换、灰度回滚、增量重建。

3. 对话记忆摘要

多轮问答时,不要把整个历史硬拼进上下文,要做摘要压缩。

4. 结构化答案输出

让模型输出 JSON Schema,方便前端渲染卡片、来源、置信度。

5. RAG 评测平台

把评测变成日常工程动作,而不是临时脚本。


二十四、全文总结

从架构视角看,本地 AI 搜索引擎的关键,不是“把 Ollama 跑起来”,而是把下面四件事真正做好:

  1. 检索做对:混合召回、查询改写、精排去重、权限过滤
  2. 生成做稳:上下文预算、引用约束、流式输出、降级策略
  3. 工程做实:缓存、限流、熔断、观测、评测、回放
  4. 架构做活:模型解耦、索引解耦、服务拆分、可灰度演进

如果只做 Demo,你会得到一个“偶尔惊艳”的系统。
如果按本文的思路做工程化升级,你更有机会得到一个“长期可维护、可持续优化、能真正服务业务”的生产级 AI 搜索平台。


附:一套推荐的技术选型组合

如果你希望快速落地,同时保留后续扩展空间,我会推荐下面这套组合:

能力域 推荐选型 原因
API 服务 FastAPI 异步友好,适合 SSE
本地推理 Ollama 本地部署简单,生态成熟
Embedding BGE-M3 / bge-small-zh 中文效果稳定
Rerank BGE Reranker / BCE Reranker 精排性价比高
公网搜索 SearXNG 自托管、可控
向量存储 pgvector / Milvus 从轻量到规模化都可覆盖
关键词检索 PostgreSQL FTS / OpenSearch 对错误码、术语、类名更友好
缓存 Redis TTL、分布式、生态成熟
观测 Prometheus + Grafana + OTel 生产必备
编排 Python AsyncIO 开发效率和 I/O 并发都不错

如果是我来落地这类系统,通常会分三步走:

  • 第一步:FastAPI + Ollama + SearXNG + Redis,先跑通主链路
  • 第二步:补向量检索、Rerank、离线入库和评测
  • 第三步:拆服务、上 K8s、做权限和多租户

这个演进路径足够务实,也足够贴近真实项目。




上一篇:企业级 AI Agent 架构实践:如何构建可治理的 Skill 核心系统
下一篇:华为τ缩放理论全解:LogicFolding实现55%晶体管密度跃升
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-5-26 05:25 , Processed in 0.625822 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表