本系列旨在介绍增强现代智能体系统可靠性的设计模式,我们将以直观方式逐一剖析每个概念,拆解其设计目的,并实现简单可行的代码版本,演示其如何融入现实世界的智能体系统。本系列共包含14篇文章,此为第13篇。原文:Building the 14 Key Pillars of Agentic AI

在构建健壮的智能体解决方案时,软件工程的智慧至关重要,它确保各个组件能够协调、并行运行,并与整个系统高效交互。例如,预测执行通过尝试处理可预测的查询来降低时延;而冗余执行则是对同一智能体重复执行多次,以防单点故障。
除了上述模式,还有许多其他设计模式可以增强现代智能体系统的可靠性,例如:
- 并行工具处理:智能体同时执行独立的API调用以隐藏I/O时延。
- 层级智能体团队:管理者将任务拆分为由专门执行智能体处理的小步骤。
- 竞争性智能体组合:多个智能体同时提出答案,由系统择优选取。
- 冗余执行:两个或多个智能体解决同一任务以检测错误并提高可靠性。
- 并行检索与混合搜索融合:多种检索策略协同运行以提升上下文质量。
- 多跳检索:智能体通过迭代检索步骤收集更深入、更相关的信息。
本系列将聚焦于实现这些常见智能体设计模式背后的基础概念。我们将逐一介绍每个概念,拆解其目的,并通过实现一个简单可行的版本来演示其如何融入现实世界的智能体系统。
所有相关理论和代码均可在GitHub仓库中找到:🤖 Agentic Parallelism: A Practical Guide 🚀
代码库结构如下:
agentic-parallelism/
├── 01_parallel_tool_use.ipynb
├── 02_parallel_hypothesis.ipynb
...
├── 06_competitive_agent_ensembles.ipynb
├── 07_agent_assembly_line.ipynb
├── 08_decentralized_blackboard.ipynb
...
├── 13_parallel_context_preprocessing.ipynb
└── 14_parallel_multi_hop_retrieval.ipynb
并行上下文预处理以提升准确度
之前我们探索的RAG(检索增强生成)模式主要集中于改进初始检索步骤,以找到更多正确的文档。而并行上下文预处理则聚焦于检索之后发生的事情。
一个最大化召回率的常见策略是检索大量候选文档(例如 k=10 或更多)。然而,直接将这个庞大且通常充满噪声的文档集合输入到最终生成器LLM的上下文窗口中会带来一系列问题。

这样做不仅速度慢、成本高(需要处理更多令牌),更重要的是,大量无关信息会“淹没”模型,损害其生成答案的准确性,即所谓的“中间丢失”问题。
解决方案是引入一个中间的“蒸馏”步骤。在检索到大量候选文档后,我们使用多个、小型的并行LLM调用来处理它们。每个调用都充当一个高度专注的过滤器,检查单个文档与特定问题的相关性。只有通过此项检查的文档才会被包含在最终的、经过“蒸馏”的上下文中,并发送给主生成器。
接下来,我们将构建并比较两个RAG系统:一个使用大型原始上下文,另一个使用并行预处理步骤,以展示可衡量的改进。
首先,我们需要定义用于结构化并行“蒸馏器”代理输出的Pydantic模型。
from langchain_core.pydantic_v1 import BaseModel, Field
class RelevancyCheck(BaseModel):
"""蒸馏/过滤的结构化输出 Pydantic 模型"""
# 对文档相关性进行明确的二元决策。
is_relevant: bool = Field(description="True if the document contains information that directly helps answer the question.")
# 对该决定的简明解释
brief_explanation: str = Field(description="A one-sentence explanation of why the document is or is not relevant.")
RelevancyCheck 模型是蒸馏器代理的“合约”。通过强制每个并行调用返回一个 is_relevant 布尔值,我们创建了一个快速、可靠的过滤机制。brief_explanation 字段对于调试和理解文档被包含或排除的原因非常有价值。
接下来,我们定义 GraphState 以及高级系统的核心节点:distill_context_node。该节点将负责协调并行预处理。
from typing import TypedDict, List
from langchain_core.documents import Document
from concurrent.futures import ThreadPoolExecutor, as_completed
class RAGGraphState(TypedDict):
question: str
raw_docs: List[Document]
distilled_docs: List[Document]
final_answer: str
# 针对每个文档并行运行的链
distiller_prompt = ChatPromptTemplate.from_template(
"Given the user‘s question, determine if the following document is relevant for answering it. "
"Provide a brief explanation.\n\n"
"Question: {question}\n\nDocument:\n{document}"
)
distiller_chain = distiller_prompt | llm.with_structured_output(RelevancyCheck)
def distill_context_node(state: RAGGraphState):
"""该模式的核心:并行扫描所有检索到的文档,以过滤相关性"""
print(f"--- [Distiller] Pre-processing {len(state['raw_docs'])} raw documents in parallel... ---")
relevant_docs = []
# 通过 ThreadPoolExecutor 在每个文档上并发运行 ‘distiller_chain’
with ThreadPoolExecutor(max_workers=5) as executor:
# 为每个要检查的文档创建一个future
future_to_doc = {executor.submit(distiller_chain.invoke, {"question": state['question'], "document": doc.page_content}): doc for doc in state['raw_docs']}
for future in as_completed(future_to_doc):
doc = future_to_doc[future]
try:
result = future.result()
# 如果蒸馏剂将文件标记为相关的,就保留
if result.is_relevant:
print(f" - Doc '{doc.metadata['source']}' IS relevant. Reason: {result.brief_explanation}")
relevant_docs.append(doc)
else:
# 否则就丢弃
print(f" - Doc '{doc.metadata['source']}' is NOT relevant. Reason: {result.brief_explanation}")
except Exception as e:
print(f"Error processing doc {doc.metadata['source']}: {e}")
print(f"--- [Distiller] Distilled context down to {len(relevant_docs)} documents. ---")
return {"distilled_docs": relevant_docs}
distill_context_node 是系统的“质量控制”节点。在高召回率的检索步骤获取了大量 raw_docs 之后,此节点充当过滤器。它利用 ThreadPoolExecutor 将工作分散,将每个文档发送给其自身的小型、独立LLM进行评估。这种并行处理至关重要,意味着处理10个文档的时间大致与处理1个文档的时间相当。随后,该节点仅收集那些符合 is_relevant 标准的文档,为最终生成器产生一个更小、更干净、信息浓度更高的 distilled_docs 列表。
然后,我们组装完整的工作流图,将新的 distill 节点放置在 retrieve 和 generate 步骤之间。
from langgraph.graph import StateGraph, END
workflow = StateGraph(RAGGraphState)
# 给流水线添加三个节点
workflow.add_node("retrieve", retrieval_node)
workflow.add_node("distill", distill_context_node)
workflow.add_node("generate", generation_node)
# 定义线性工作流:检索->提取->生成
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "distill")
workflow.add_edge("distill", "generate")
workflow.add_edge("generate", END)
advanced_rag_app = workflow.compile()

最后,我们进行直接的对比分析。对同一查询分别运行简单(大上下文)和高级(蒸馏上下文)的RAG系统,并仔细比较两者的准确性、延迟和令牌开销。
import tiktoken
def count_tokens(text: str) -> int:
"""为成本分析计算令牌的辅助函数"""
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
# --- 分析设置 ---
context_tokens_simple = count_tokens(context_simple)
context_tokens_advanced = count_tokens(context_advanced)
token_improvement = (context_tokens_simple - context_tokens_advanced) / context_tokens_simple * 100
latency_improvement = (gen_time_simple - gen_time_advanced) / gen_time_simple * 100
# --- 打印结果 ---
print("="*60)
print(" ACCURACY & QUALITY ANALYSIS")
print("="*60 + "\n")
print("**Simple RAG‘s Answer (from Large, Noisy Context):**")
print(f'"{simple_answer}"\n')
print("**Advanced RAG‘s Answer (from Distilled, Focused Context):**")
print(f'"{advanced_answer}"\n')
print("="*60)
print(" LATENCY & COST (TOKEN) ANALYSIS")
print("="*60 + "\n")
print("| Metric | Simple RAG (Large Context) | Advanced RAG (Distilled Context) | Improvement |")
print("|-----------------------------|----------------------------|----------------------------------|-------------|")
print(f"| Context Size (Tokens) | {context_tokens_simple:<26} | {context_tokens_advanced:<32} | **-{token_improvement:.0f}%** |")
print(f"| Final Generation Time | {gen_time_simple:<24.2f} seconds | {gen_time_advanced:<32.2f} seconds | **-{latency_improvement:.0f}%** |")
分析结果如下:
============================================================
ACCURACY & QUALITY ANALYSIS
============================================================
**Simple RAG Answer (from Large, Noisy Context):**
"Based on the context, a power supply unit of at least 1200W is recommended for the QLeap-V4 processor. The QLeap-V3 chip had a recommended power supply of 800W."
**Advanced RAG Answer (from Distilled, Focused Context):**
"Based on the provided context, a power supply unit of at least 1200W is recommended for the QLeap-V4 processor."
**Analysis:** The Simple RAG answer, while technically correct, includes irrelevant information about the previous generation product (QLeap-V3). This happened because the large, noisy context included documents about both products. The Advanced RAG answer is **more accurate and precise**. The parallel distillation step correctly filtered out the irrelevant document about the QLeap-V3, providing a clean, focused context to the generator, which then produced a perfect, concise answer.
============================================================
LATENCY & COST (TOKEN) ANALYSIS
============================================================
| Metric | Simple RAG (Large Context) | Advanced RAG (Distilled Context) | Improvement |
|-----------------------------|----------------------------|----------------------------------|-------------|
| Context Size (Tokens) | 284 | 29 | **-90%** |
| Final Generation Time | 7.89 seconds | 2.15 seconds | **-73%** |
最终分析提供了清晰、数据驱动的结论,表明并行上下文预处理模式带来了三重显著改进:
- 更高的准确性:定性分析显示,高级系统产出了更精确、更聚焦的答案。通过过滤掉关于旧型号“QLeap-V3”的干扰性文档,蒸馏步骤防止了最终生成器混杂不相关信息,直接提升了答案质量。
- 更低的成本:输入到最终、更昂贵的生成器的上下文令牌数量大幅减少了90%。在一个处理数百万查询的生产系统中,这将直接转化为LLM推理成本的显著节约。
- 更低的延迟:上下文大小的减少直接影响了最终生成步骤的性能,使其速度提高了73%。虽然蒸馏步骤本身会引入一些开销,但这些开销通常会被最终、计算最密集步骤所节省的时间所抵消,从而使用户的整体响应时间更快。
通过实现这种并行预处理模式,我们有效地在智能体流水线中引入了一个智能的、并行的“质量门”,在不牺牲召回率的前提下,显著提升了生成答案的精确度、成本效益和响应速度。想深入探讨更多类似的系统架构优化技巧,欢迎访问云栈社区与其他开发者交流。