找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3721

积分

0

好友

519

主题
发表于 7 天前 | 查看: 26| 回复: 0

本系列旨在介绍增强现代智能体系统可靠性的关键设计模式。我们将以直观的方式逐一拆解每个概念,阐明其设计目标,并通过简单可行的代码实现,演示其如何融入现实世界的智能体系统。本系列共包含14篇文章,此为第7篇,原文参见 Building the 14 Key Pillars of Agentic AI[^1]。

多代理系统并行处理架构模式图解,包含并行工具处理、分支假设生成、并行评估等12个模块

构建稳健的智能体解决方案,离不开软件工程思想的加持,以确保各个组件能协调、并行运行,并与系统高效交互。例如,预测执行(Speculative Execution)[^2]会尝试预先处理可预测的查询以降低系统时延;而冗余执行(Redundant Execution)[^3]则通过对同一任务启动多个智能体实例来防止单点故障。除此之外,业界还涌现出多种旨在增强智能体系统可靠性的设计模式,例如:

  • 并行工具调用:智能体同时发起多个独立的API调用,以隐藏I/O等待时间。
  • 层级智能体:管理者智能体将复杂任务拆分为多个小步骤,交由不同的执行智能体处理。
  • 竞争性智能体组合:多个智能体针对同一问题提出各自答案,系统从中选取最优解。
  • 并行检索与混合检索:多种检索策略同时运行,以提升召回上下文的质量和相关性。
  • 多跳检索:智能体通过迭代式的检索步骤,逐步收集更深入、更精准的信息。

当然,模式远不止这些。本系列的目标,就是通过代码实现这些最常见AI智能体模式背后的核心概念。所有相关的理论和完整代码均可在GitHub仓库中找到:🤖 Agentic Parallelism: A Practical Guide 🚀[^4]。

代码库结构如下:

agentic-parallelism/
    ├── 01_parallel_tool_use.ipynb
    ├── 02_parallel_hypothesis.ipynb
    ...
    ├── 06_competitive_agent_ensembles.ipynb
    ├── 07_agent_assembly_line.ipynb
    ├── 08_decentralized_blackboard.ipynb
    ...
    ├── 13_parallel_context_preprocessing.ipynb
    └── 14_parallel_multi_hop_retrieval.ipynb

支持大吞吐量的代理装配线

迄今为止,我们探讨的并行模式(如并行工具调用、并行假设生成与评估)都聚焦于减少单个复杂任务的时延,旨在让智能体对单个用户查询的响应更快、更智能。

但如果我们面临的挑战不是任务的复杂性,而是需要连续处理海量任务呢?对于许多生产级应用而言,最关键的指标往往不是处理单个请求的速度,而是系统每小时能够处理的任务总量,即吞吐量。

代理装配线模式与并行非流水线方法的对比示意图

这正是 代理装配线(Agent Assembly Line) 架构大放异彩的场景。这种模式将设计重点从最小化时延转向了最大化吞吐量

其核心思想是:摒弃由单个“庞然大物”般的智能体从头到尾串行处理每个任务的方式,转而将整个流程拆解为一系列专业化的“工作站”。一个任务项在某个工作站完成后,立即被传递给下一个站。所有工作站并行工作,分别处理流水线上处于不同阶段的任务项。

接下来,我们将动手构建一个三阶段的流水线,用于处理一批产品评论。我们的目标是,通过细致的时序分析来证明,相较于传统的顺序处理方法,这种并行流水线架构能显著提升每秒处理的评论数量。

首先,定义代表评论的数据结构。这个结构将在装配线上流动,每个工作站都会逐步丰富其内容。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List, Literal, Optional

class TriageResult(BaseModel):
    """初始分流站的结构化输出"""
    category: Literal["Feedback", "Bug Report", "Support Request", "Irrelevant"] = Field(description="The category of the review.")

class Summary(BaseModel):
    """摘要站的结构化输出"""
    summary: str = Field(description="A one-sentence summary of the key feedback in the review.")

class ExtractedData(BaseModel):
    """数据提取站的结构化输出"""
    product_mentioned: str = Field(description="The specific product the review is about.")
    sentiment: Literal["Positive", "Negative", "Neutral"] = Field(description="The overall sentiment of the review.")
    key_feature: str = Field(description="The main feature or aspect discussed in the review.")

class ProcessedReview(BaseModel):
    """累积所有站点数据的最终的、经过完全处理的评论对象"""
    original_review: str
    category: str
    summary: Optional[str] = None
    extracted_data: Optional[ExtractedData] = None

这些Pydantic模型如同装配线上的“标准化集装箱”。ProcessedReview 对象是中心数据载体,它在第一个工作站(Triage)被创建,随后在流经后续工作站(summaryextracted_data)时被逐步充实。这确保了流水线每个阶段都遵循一致的数据契约。

接下来,定义 GraphState(在此案例中我们称之为 PipelineState),它将作用于一批评论。

from typing import TypedDict, Annotated, List
import operator

class PipelineState(TypedDict):
    # ‘initial_reviews‘ 保存传入的原始评论字符串
    initial_reviews: List[str]
    # ‘processed_reviews‘ 是 ProcessedReview 对象列表,这些对象是在通过流水线移动时构建的
    processed_reviews: List[ProcessedReview]
    performance_log: Annotated[List[str], operator.add]

PipelineState 专为批处理设计。整个装配线将使用 initial_reviews 列表调用一次,其最终输出将是完整的 processed_reviews 列表。

现在,为装配线上的每个工作站定义节点函数。一个关键实现细节是:每个节点都使用 ThreadPoolExecutor 来并行处理分配到自己这个阶段的所有任务项。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from tqdm import tqdm

MAX_WORKERS = 4  # 控制每个工作站的并行度

# 工作站 1: 分流节点
def triage_node(state: PipelineState):
    """第 1 站:并行的对所有初始评论进行分类"""
    print(f“--- [Station 1: Triage] Processing {len(state[‘initial_reviews’])} reviews... ---“)
    start_time = time.time()

    triaged_reviews = []
    # 用 ThreadPoolExecutor 对每个评论进行并行 LLM 调用
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        # We create a future for each review to be triaged.
        future_to_review = {executor.submit(triage_chain.invoke, {“review_text“: review}): review for review in state[‘initial_reviews’]}
        for future in tqdm(as_completed(future_to_review), total=len(state[‘initial_reviews’]), desc=“Triage Progress“):
            original_review = future_to_review[future]
            try:
                result = future.result()
                # 创建初始 ProcessedReview 对象
                triaged_reviews.append(ProcessedReview(original_review=original_review, category=result.category))
            except Exception as exc:
                print(f‘Review generated an exception: {exc}‘)

    execution_time = time.time() - start_time
    log = f“[Triage] Processed {len(state[‘initial_reviews’])} reviews in {execution_time:.2f}s.“
    print(log)

    # 该节点的输出是已处理评论的初始列表
    return {“processed_reviews“: triaged_reviews, “performance_log“: [log]}

triage_node 是装配线的入口。其设计关键在于使用了 ThreadPoolExecutor。该节点一次性将所有评论提交给 triage_chain,然后 as_completed 迭代器会在每个LLM调用完成时立即产出结果。这使得我们能高效地构建 triaged_reviews 列表,确保该工作站的耗时仅由最慢的几个LLM调用来决定,而非所有调用耗时的总和。

后续的 summarize_nodeextract_data_node 遵循相同的并行处理模式:首先筛选出自己负责处理的任务项,然后进行并行处理。

# 工作站 2: 总结节点
def summarize_node(state: PipelineState):
    """第 2 站:过滤反馈评论并并行总结"""
    # 本站只处理“反馈”类评论
    feedback_reviews = [r for r in state[‘processed_reviews’] if r.category == “Feedback“]
    if not feedback_reviews:
        print(“--- [Station 2: Summarizer] No feedback reviews to process. Skipping. ---“)
        return {}

    print(f“--- [Station 2: Summarizer] Processing {len(feedback_reviews)} feedback reviews... ---“)
    start_time = time.time()

    # 用 map 来方便的更新评论
    review_map = {r.original_review: r for r in state[‘processed_reviews’]}
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_review = {executor.submit(summarizer_chain.invoke, {“review_text“: r.original_review}): r for r in feedback_reviews}
        for future in tqdm(as_completed(future_to_review), total=len(feedback_reviews), desc=“Summarizer Progress“):
            original_review_obj = future_to_review[future]
            try:
                result = future.result()
                # 在 map 中找到原始评论对象,并用摘要来充实
                review_map[original_review_obj.original_review].summary = result.summary
            except Exception as exc:
                print(f‘Review generated an exception: {exc}‘)

    execution_time = time.time() - start_time
    log = f“[Summarizer] Processed {len(feedback_reviews)} reviews in {execution_time:.2f}s.“
    print(log)

    # 返回完整的、更新的评论列表
    return {“processed_reviews“: list(review_map.values()), “performance_log“: [log]}
# 工作站 3: 数据提取节点
def extract_data_node(state: PipelineState):
    """最后一站:并行的从已总结评论中提取结构化数据"""
    # 这个站点只操作已带摘要的评论
    summarized_reviews = [r for r in state[‘processed_reviews’] if r.summary is not None]
    if not summarized_reviews:
        print(“--- [Station 3: Extractor] No summarized reviews to process. Skipping. ---“)
        return {}

    print(f“--- [Station 3: Extractor] Processing {len(summarized_reviews)} summarized reviews... ---“)
    start_time = time.time()

    review_map = {r.original_review: r for r in state[‘processed_reviews’]}
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_review = {executor.submit(extractor_chain.invoke, {“summary_text“: r.summary}): r for r in summarized_reviews}
        for future in tqdm(as_completed(future_to_review), total=len(summarized_reviews), desc=“Extractor Progress“):
            original_review_obj = future_to_review[future]
            try:
                result = future.result()
                # 最后一次用提取的数据丰富评论对象
                review_map[original_review_obj.original_review].extracted_data = result
            except Exception as exc:
                print(f‘Review generated an exception: {exc}‘)

    execution_time = time.time() - start_time
    log = f“[Extractor] Processed {len(summarized_reviews)} reviews in {execution_time:.2f}s.“
    print(log)

    return {“processed_reviews“: list(review_map.values()), “performance_log“: [log]}

这些节点包含了装配线的过滤逻辑。summarize_node 并非处理所有评论,它只从流水线上“捞取”被标记为“反馈”的项;同理,extractor_node 只处理那些已被成功总结的项。这种专业化是该模式的核心特性之一。

接下来,我们可以用 LangGraph 来组装这个线性工作流图。

from langgraph.graph import StateGraph, END

# 初始化图
workflow = StateGraph(PipelineState)

# 将三个工作站添加为节点
workflow.add_node(“triage“, triage_node)
workflow.add_node(“summarize“, summarize_node)
workflow.add_node(“extract_data“, extract_data_node)

# 定义装配线的线性流
workflow.set_entry_point(“triage“)
workflow.add_edge(“triage“, “summarize“)
workflow.add_edge(“summarize“, “extract_data“)
workflow.add_edge(“extract_data“, END)

# 编译图
app = workflow.compile()

代理装配线三阶段线性流程图:起始 -> 分流 -> 总结 -> 提取数据 -> 结束

现在,进行最终的关键分析:比较代理装配线与模拟的单一智能体顺序处理方式的性能,以量化吞吐量的提升。

# 流水线工作流程的总时间是每个阶段处理整个批次的时间总和
pipelined_total_time = triage_time + summarize_time + extract_time

# 吞吐量是处理的总件数除以总时间
pipelined_throughput = num_reviews / pipelined_total_time

# 现在模拟顺序的单一代理
# 首先,估算一条评论通过一个阶段所需的平均时间
avg_time_per_stage_per_review = (triage_time + summarize_time + extract_time) / num_reviews

# 单条评论从开始到结束的总时延是三个阶段的时间总和
total_latency_per_review = avg_time_per_stage_per_review * 3

# 顺序代理处理 N 条评论的总时间是单条评论时延的 N 倍
sequential_total_time = total_latency_per_review * num_reviews
sequential_throughput = num_reviews / sequential_total_time

# 计算吞吐量增加百分比
throughput_increase = ((pipelined_throughput - sequential_throughput) / sequential_throughput) * 100

print(“=“*60)
print(“                      PERFORMANCE ANALYSIS“)
print(“=“*60)
print(“\n--- Assembly Line (Pipelined) Workflow ---“)
print(f“Total Time to Process {num_reviews} Reviews: {pipelined_total_time:.2f} seconds“)
print(f“Calculated Throughput: {pipelined_throughput:.2f} reviews/second\n“)
print(“--- Monolithic (Sequential) Workflow (Simulated) ---“)
print(f“Avg. Latency For One Review to Complete All Stages: {total_latency_per_review:.2f} seconds“)
print(f“Simulated Total Time to Process {num_reviews} Reviews: {sequential_total_time:.2f} seconds“)
print(f“Simulated Throughput: {sequential_throughput:.2f} reviews/second\n“)
print(“=“*60)
print(“                        CONCLUSION“)
print(“=“*60)
print(f“Throughput Increase: {throughput_increase:.0f}%“)

输出

============================================================
                      PERFORMANCE ANALYSIS
============================================================

--- Assembly Line (Pipelined) Workflow ---
Total Time to Process 10 Reviews: 20.40 seconds
Calculated Throughput: 0.49 reviews/second

--- Monolithic (Sequential) Workflow (Simulated) ---
Avg. Latency For One Review to Complete All Stages: 6.12 seconds
Simulated Total Time to Process 10 Reviews: 61.20 seconds
Simulated Throughput: 0.16 reviews/second

============================================================
                        CONCLUSION
============================================================
Throughput Increase: 206%

分析结果清晰地展示了代理装配线模式的强大威力。虽然处理单条评论从头到尾的时延(Latency)大约是6秒,但流水线系统在20秒出头的时间里就处理完了整批10条数据。相比之下,传统的单一智能体串行处理方式则需要超过60秒。

代理装配线的吞吐量提升了三倍以上(206%)。这是因为当“数据提取器”在处理第一条评论时,“总结器”已经在处理第二条,而“分流器”则可能早就开始处理第三条了。这种并行化、流水线式的执行,正是构建能够处理海量数据的高吞吐量AI智能体系统的关键所在。

技术实践的分享与交流能加速创新。如果你对AI智能体、Python或系统架构有更多想法,欢迎在云栈社区与同行们继续深入探讨。

[^1]: Building the 14 Key Pillars of Agentic AI: https://levelup.gitconnected.com/building-the-14-key-pillars-of-agentic-ai-229e50f65986
[^2]: 预测执行: https://en.wikipedia.org/wiki/Speculative_execution
[^3]: 冗余执行: https://developer.arm.com/community/arm-community-blogs/b/embedded-and-microcontrollers-blog/posts/comparing-lock-step-redundant-execution-versus-split-lock-technologies
[^4]: 🤖 Agentic Parallelism: A Practical Guide 🚀: https://github.com/FareedKhan-dev/agentic-parallelism




上一篇:LangChain与LangGraph实现去中心化黑板协作:构建高可靠AI客户支持系统
下一篇:与门、或门、非门工作原理与电路实例:从逻辑基础到数字电路设计
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-23 07:32 , Processed in 0.448032 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表