本系列旨在介绍增强现代智能体系统可靠性的关键设计模式。我们将以直观的方式逐一拆解每个概念,阐明其设计目标,并通过简单可行的代码实现,演示其如何融入现实世界的智能体系统。本系列共包含14篇文章,此为第7篇,原文参见 Building the 14 Key Pillars of Agentic AI[^1]。

构建稳健的智能体解决方案,离不开软件工程思想的加持,以确保各个组件能协调、并行运行,并与系统高效交互。例如,预测执行(Speculative Execution)[^2]会尝试预先处理可预测的查询以降低系统时延;而冗余执行(Redundant Execution)[^3]则通过对同一任务启动多个智能体实例来防止单点故障。除此之外,业界还涌现出多种旨在增强智能体系统可靠性的设计模式,例如:
- 并行工具调用:智能体同时发起多个独立的API调用,以隐藏I/O等待时间。
- 层级智能体:管理者智能体将复杂任务拆分为多个小步骤,交由不同的执行智能体处理。
- 竞争性智能体组合:多个智能体针对同一问题提出各自答案,系统从中选取最优解。
- 并行检索与混合检索:多种检索策略同时运行,以提升召回上下文的质量和相关性。
- 多跳检索:智能体通过迭代式的检索步骤,逐步收集更深入、更精准的信息。
当然,模式远不止这些。本系列的目标,就是通过代码实现这些最常见AI智能体模式背后的核心概念。所有相关的理论和完整代码均可在GitHub仓库中找到:🤖 Agentic Parallelism: A Practical Guide 🚀[^4]。
代码库结构如下:
agentic-parallelism/
├── 01_parallel_tool_use.ipynb
├── 02_parallel_hypothesis.ipynb
...
├── 06_competitive_agent_ensembles.ipynb
├── 07_agent_assembly_line.ipynb
├── 08_decentralized_blackboard.ipynb
...
├── 13_parallel_context_preprocessing.ipynb
└── 14_parallel_multi_hop_retrieval.ipynb
支持大吞吐量的代理装配线
迄今为止,我们探讨的并行模式(如并行工具调用、并行假设生成与评估)都聚焦于减少单个复杂任务的时延,旨在让智能体对单个用户查询的响应更快、更智能。
但如果我们面临的挑战不是任务的复杂性,而是需要连续处理海量任务呢?对于许多生产级应用而言,最关键的指标往往不是处理单个请求的速度,而是系统每小时能够处理的任务总量,即吞吐量。

这正是 代理装配线(Agent Assembly Line) 架构大放异彩的场景。这种模式将设计重点从最小化时延转向了最大化吞吐量。
其核心思想是:摒弃由单个“庞然大物”般的智能体从头到尾串行处理每个任务的方式,转而将整个流程拆解为一系列专业化的“工作站”。一个任务项在某个工作站完成后,立即被传递给下一个站。所有工作站并行工作,分别处理流水线上处于不同阶段的任务项。
接下来,我们将动手构建一个三阶段的流水线,用于处理一批产品评论。我们的目标是,通过细致的时序分析来证明,相较于传统的顺序处理方法,这种并行流水线架构能显著提升每秒处理的评论数量。
首先,定义代表评论的数据结构。这个结构将在装配线上流动,每个工作站都会逐步丰富其内容。
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List, Literal, Optional
class TriageResult(BaseModel):
"""初始分流站的结构化输出"""
category: Literal["Feedback", "Bug Report", "Support Request", "Irrelevant"] = Field(description="The category of the review.")
class Summary(BaseModel):
"""摘要站的结构化输出"""
summary: str = Field(description="A one-sentence summary of the key feedback in the review.")
class ExtractedData(BaseModel):
"""数据提取站的结构化输出"""
product_mentioned: str = Field(description="The specific product the review is about.")
sentiment: Literal["Positive", "Negative", "Neutral"] = Field(description="The overall sentiment of the review.")
key_feature: str = Field(description="The main feature or aspect discussed in the review.")
class ProcessedReview(BaseModel):
"""累积所有站点数据的最终的、经过完全处理的评论对象"""
original_review: str
category: str
summary: Optional[str] = None
extracted_data: Optional[ExtractedData] = None
这些Pydantic模型如同装配线上的“标准化集装箱”。ProcessedReview 对象是中心数据载体,它在第一个工作站(Triage)被创建,随后在流经后续工作站(summary、extracted_data)时被逐步充实。这确保了流水线每个阶段都遵循一致的数据契约。
接下来,定义 GraphState(在此案例中我们称之为 PipelineState),它将作用于一批评论。
from typing import TypedDict, Annotated, List
import operator
class PipelineState(TypedDict):
# ‘initial_reviews‘ 保存传入的原始评论字符串
initial_reviews: List[str]
# ‘processed_reviews‘ 是 ProcessedReview 对象列表,这些对象是在通过流水线移动时构建的
processed_reviews: List[ProcessedReview]
performance_log: Annotated[List[str], operator.add]
PipelineState 专为批处理设计。整个装配线将使用 initial_reviews 列表调用一次,其最终输出将是完整的 processed_reviews 列表。
现在,为装配线上的每个工作站定义节点函数。一个关键实现细节是:每个节点都使用 ThreadPoolExecutor 来并行处理分配到自己这个阶段的所有任务项。
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from tqdm import tqdm
MAX_WORKERS = 4 # 控制每个工作站的并行度
# 工作站 1: 分流节点
def triage_node(state: PipelineState):
"""第 1 站:并行的对所有初始评论进行分类"""
print(f“--- [Station 1: Triage] Processing {len(state[‘initial_reviews’])} reviews... ---“)
start_time = time.time()
triaged_reviews = []
# 用 ThreadPoolExecutor 对每个评论进行并行 LLM 调用
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# We create a future for each review to be triaged.
future_to_review = {executor.submit(triage_chain.invoke, {“review_text“: review}): review for review in state[‘initial_reviews’]}
for future in tqdm(as_completed(future_to_review), total=len(state[‘initial_reviews’]), desc=“Triage Progress“):
original_review = future_to_review[future]
try:
result = future.result()
# 创建初始 ProcessedReview 对象
triaged_reviews.append(ProcessedReview(original_review=original_review, category=result.category))
except Exception as exc:
print(f‘Review generated an exception: {exc}‘)
execution_time = time.time() - start_time
log = f“[Triage] Processed {len(state[‘initial_reviews’])} reviews in {execution_time:.2f}s.“
print(log)
# 该节点的输出是已处理评论的初始列表
return {“processed_reviews“: triaged_reviews, “performance_log“: [log]}
triage_node 是装配线的入口。其设计关键在于使用了 ThreadPoolExecutor。该节点一次性将所有评论提交给 triage_chain,然后 as_completed 迭代器会在每个LLM调用完成时立即产出结果。这使得我们能高效地构建 triaged_reviews 列表,确保该工作站的耗时仅由最慢的几个LLM调用来决定,而非所有调用耗时的总和。
后续的 summarize_node 和 extract_data_node 遵循相同的并行处理模式:首先筛选出自己负责处理的任务项,然后进行并行处理。
# 工作站 2: 总结节点
def summarize_node(state: PipelineState):
"""第 2 站:过滤反馈评论并并行总结"""
# 本站只处理“反馈”类评论
feedback_reviews = [r for r in state[‘processed_reviews’] if r.category == “Feedback“]
if not feedback_reviews:
print(“--- [Station 2: Summarizer] No feedback reviews to process. Skipping. ---“)
return {}
print(f“--- [Station 2: Summarizer] Processing {len(feedback_reviews)} feedback reviews... ---“)
start_time = time.time()
# 用 map 来方便的更新评论
review_map = {r.original_review: r for r in state[‘processed_reviews’]}
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_review = {executor.submit(summarizer_chain.invoke, {“review_text“: r.original_review}): r for r in feedback_reviews}
for future in tqdm(as_completed(future_to_review), total=len(feedback_reviews), desc=“Summarizer Progress“):
original_review_obj = future_to_review[future]
try:
result = future.result()
# 在 map 中找到原始评论对象,并用摘要来充实
review_map[original_review_obj.original_review].summary = result.summary
except Exception as exc:
print(f‘Review generated an exception: {exc}‘)
execution_time = time.time() - start_time
log = f“[Summarizer] Processed {len(feedback_reviews)} reviews in {execution_time:.2f}s.“
print(log)
# 返回完整的、更新的评论列表
return {“processed_reviews“: list(review_map.values()), “performance_log“: [log]}
# 工作站 3: 数据提取节点
def extract_data_node(state: PipelineState):
"""最后一站:并行的从已总结评论中提取结构化数据"""
# 这个站点只操作已带摘要的评论
summarized_reviews = [r for r in state[‘processed_reviews’] if r.summary is not None]
if not summarized_reviews:
print(“--- [Station 3: Extractor] No summarized reviews to process. Skipping. ---“)
return {}
print(f“--- [Station 3: Extractor] Processing {len(summarized_reviews)} summarized reviews... ---“)
start_time = time.time()
review_map = {r.original_review: r for r in state[‘processed_reviews’]}
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_review = {executor.submit(extractor_chain.invoke, {“summary_text“: r.summary}): r for r in summarized_reviews}
for future in tqdm(as_completed(future_to_review), total=len(summarized_reviews), desc=“Extractor Progress“):
original_review_obj = future_to_review[future]
try:
result = future.result()
# 最后一次用提取的数据丰富评论对象
review_map[original_review_obj.original_review].extracted_data = result
except Exception as exc:
print(f‘Review generated an exception: {exc}‘)
execution_time = time.time() - start_time
log = f“[Extractor] Processed {len(summarized_reviews)} reviews in {execution_time:.2f}s.“
print(log)
return {“processed_reviews“: list(review_map.values()), “performance_log“: [log]}
这些节点包含了装配线的过滤逻辑。summarize_node 并非处理所有评论,它只从流水线上“捞取”被标记为“反馈”的项;同理,extractor_node 只处理那些已被成功总结的项。这种专业化是该模式的核心特性之一。
接下来,我们可以用 LangGraph 来组装这个线性工作流图。
from langgraph.graph import StateGraph, END
# 初始化图
workflow = StateGraph(PipelineState)
# 将三个工作站添加为节点
workflow.add_node(“triage“, triage_node)
workflow.add_node(“summarize“, summarize_node)
workflow.add_node(“extract_data“, extract_data_node)
# 定义装配线的线性流
workflow.set_entry_point(“triage“)
workflow.add_edge(“triage“, “summarize“)
workflow.add_edge(“summarize“, “extract_data“)
workflow.add_edge(“extract_data“, END)
# 编译图
app = workflow.compile()

现在,进行最终的关键分析:比较代理装配线与模拟的单一智能体顺序处理方式的性能,以量化吞吐量的提升。
# 流水线工作流程的总时间是每个阶段处理整个批次的时间总和
pipelined_total_time = triage_time + summarize_time + extract_time
# 吞吐量是处理的总件数除以总时间
pipelined_throughput = num_reviews / pipelined_total_time
# 现在模拟顺序的单一代理
# 首先,估算一条评论通过一个阶段所需的平均时间
avg_time_per_stage_per_review = (triage_time + summarize_time + extract_time) / num_reviews
# 单条评论从开始到结束的总时延是三个阶段的时间总和
total_latency_per_review = avg_time_per_stage_per_review * 3
# 顺序代理处理 N 条评论的总时间是单条评论时延的 N 倍
sequential_total_time = total_latency_per_review * num_reviews
sequential_throughput = num_reviews / sequential_total_time
# 计算吞吐量增加百分比
throughput_increase = ((pipelined_throughput - sequential_throughput) / sequential_throughput) * 100
print(“=“*60)
print(“ PERFORMANCE ANALYSIS“)
print(“=“*60)
print(“\n--- Assembly Line (Pipelined) Workflow ---“)
print(f“Total Time to Process {num_reviews} Reviews: {pipelined_total_time:.2f} seconds“)
print(f“Calculated Throughput: {pipelined_throughput:.2f} reviews/second\n“)
print(“--- Monolithic (Sequential) Workflow (Simulated) ---“)
print(f“Avg. Latency For One Review to Complete All Stages: {total_latency_per_review:.2f} seconds“)
print(f“Simulated Total Time to Process {num_reviews} Reviews: {sequential_total_time:.2f} seconds“)
print(f“Simulated Throughput: {sequential_throughput:.2f} reviews/second\n“)
print(“=“*60)
print(“ CONCLUSION“)
print(“=“*60)
print(f“Throughput Increase: {throughput_increase:.0f}%“)
输出
============================================================
PERFORMANCE ANALYSIS
============================================================
--- Assembly Line (Pipelined) Workflow ---
Total Time to Process 10 Reviews: 20.40 seconds
Calculated Throughput: 0.49 reviews/second
--- Monolithic (Sequential) Workflow (Simulated) ---
Avg. Latency For One Review to Complete All Stages: 6.12 seconds
Simulated Total Time to Process 10 Reviews: 61.20 seconds
Simulated Throughput: 0.16 reviews/second
============================================================
CONCLUSION
============================================================
Throughput Increase: 206%
分析结果清晰地展示了代理装配线模式的强大威力。虽然处理单条评论从头到尾的时延(Latency)大约是6秒,但流水线系统在20秒出头的时间里就处理完了整批10条数据。相比之下,传统的单一智能体串行处理方式则需要超过60秒。
代理装配线的吞吐量提升了三倍以上(206%)。这是因为当“数据提取器”在处理第一条评论时,“总结器”已经在处理第二条,而“分流器”则可能早就开始处理第三条了。这种并行化、流水线式的执行,正是构建能够处理海量数据的高吞吐量AI智能体系统的关键所在。
技术实践的分享与交流能加速创新。如果你对AI智能体、Python或系统架构有更多想法,欢迎在云栈社区与同行们继续深入探讨。
[^1]: Building the 14 Key Pillars of Agentic AI: https://levelup.gitconnected.com/building-the-14-key-pillars-of-agentic-ai-229e50f65986
[^2]: 预测执行: https://en.wikipedia.org/wiki/Speculative_execution
[^3]: 冗余执行: https://developer.arm.com/community/arm-community-blogs/b/embedded-and-microcontrollers-blog/posts/comparing-lock-step-redundant-execution-versus-split-lock-technologies
[^4]: 🤖 Agentic Parallelism: A Practical Guide 🚀: https://github.com/FareedKhan-dev/agentic-parallelism