找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1180

积分

1

好友

161

主题
发表于 15 小时前 | 查看: 1| 回复: 0

LangGraph正成为构建可控、可观测、可插拔AI工作流的热门框架。无论是智能客服、多步骤智能体,还是自动化运维与AI建站,都能基于它实现高效的业务编排。

然而,许多开发者在初次接触时,常因对其整体框架和运行机制缺乏系统性认知而感到无从下手。

本文旨在用最短的时间,帮助开发者建立对LangGraph的“整体认知”。我们将避开抽象概念的堆砌,通过一个极简但完整的旅游规划助手示例,直观展示LangGraph中六个核心知识点:State(状态)、Node(节点)、流程控制、工具调用、状态持久化以及人工干预的具体实现。

你将通过本文轻松掌握:

  • Graph的搭建与运作机制。
  • 节点、状态、工具与流程之间的关系。
  • 如何编排顺序、分支、循环及并行业务流。

项目示例:智能旅游规划助手

我们以规划一个“云南5天游,预算8000元”的行程为例,其核心流程框架如下:

开始 → 解析意图 → 并行查询 → 汇总结果 → 预算评估
                          ↓
预算优化循环(最多3次)←→ 人工干预(条件分支)
                          ↓
生成最终行程 → 文件写入 → 结束
                          ↓
中断恢复支持(任意节点可恢复)

流程步骤说明:

  1. 解析意图:调用大模型解析用户输入,提取地点、天数、预算等关键信息。若信息不全,则请求用户补充。
  2. 顺序验证:依次验证预算、目的地、时间及个人信息的可行性。
  3. 并行查询:同时查询机票、酒店、景点信息。
  4. 汇总结果:合并所有并行查询的结果。
  5. 循环优化:进行预算优化与行程优化(循环逻辑)。
  6. 生成行程:调用大模型生成最终的行程表。
  7. 保存结果:调用工具将行程保存为文件。

接下来,我们逐一拆解这些流程在LangGraph中的实现。

1. State(状态):工作流的“全局数据上下文”

State是整个工作流的“数据总线”,所有节点(Node)和工具都从中读取数据,并将处理结果写回。它本质上是一个全局共享的字典。在人工智能工作流中,保持状态的一致性至关重要。

class TravelState(TypedDict):
    """旅游规划状态定义"""
    messages: Annotated[Sequence[BaseMessage], add_messages]
    input: Optional[str]  # 用户原始输入
    travel_info: Optional[Dict[str, Any]]  # 旅行基本信息:目的地、天数、预算等
    query_results: Optional[Dict[str, Any]]  # 合并后的查询结果
    cost_analysis: Optional[Dict[str, Any]]  # 成本分析
    itinerary: Optional[str]  # 生成的行程
    status: str  # 当前状态
    _control: Optional[Dict[str, Any]]  # 内部流程控制字段
    # 并行查询结果独立字段
    flight_info: Optional[Dict[str, Any]]
    hotel_info: Optional[Dict[str, Any]]
    attractions_info: Optional[Dict[str, Any]]

无论是查询、评估还是生成,所有数据都在State中流转。

核心要点:State是流程的“唯一真相源”,是所有节点间协作的基础。

2. Node(节点):执行具体任务的“功能单元”

Node是Graph的基本执行单元,每个Node都是一个独立的“步骤函数”,专注于单一职责。

"""创建旅游规划Graph"""
workflow = StateGraph(TravelState)

# 添加核心处理节点
workflow.add_node("parse_intent", node_parse_intent)              # 解析用户意图
# 顺序执行节点链
workflow.add_node("validate_budget", node_validate_budget)        # 1️⃣ 预算验证
workflow.add_node("check_destination", node_check_destination)    # 2️⃣ 目的地检查
workflow.add_node("verify_travel_time", node_verify_travel_time)  # 3️⃣ 时间检查
workflow.add_node("check_documents", node_check_documents)        # 4️⃣ 个人信息验证
# 并行执行节点
workflow.add_node("query_flights", node_query_flights)            # 查询航班
workflow.add_node("query_hotels", node_query_hotels)              # 查询酒店
workflow.add_node("query_attractions", node_query_attractions)    # 查询景点
workflow.add_node("aggregate_results", node_aggregate_parallel_results) # 汇总结果
# 循环执行节点
workflow.add_node("budget_optimization", node_budget_optimization)              # 预算优化
workflow.add_node("check_budget_satisfaction", node_check_budget_satisfaction)  # 检查预算满意度
workflow.add_node("generate_itinerary", node_generate_itinerary)  # 生成最终行程

通过将流程拆分为细粒度的Node,后期的流程编排将变得非常灵活。

核心要点:Node = 单功能步骤函数。模块化是灵活编排的前提。

3. Control Flow(流程控制):决定下一步的“导航逻辑”

LangGraph支持多种流程组织方式,即控制流。

① 顺序执行

最基本的流程,节点按既定顺序依次执行。

# 顺序执行链:预算验证 → 目的地检查 → 时间验证 → 文件检查
workflow.add_edge("validate_budget", "check_destination")      # 1️⃣ → 2️⃣
workflow.add_edge("check_destination", "verify_travel_time")   # 2️⃣ → 3️⃣
workflow.add_edge("verify_travel_time", "check_documents")     # 3️⃣ → 4️⃣
workflow.add_edge("check_documents", "start_parallel")         # 4️⃣ → 并行开始节点

适用场景:有明确、固定步骤的业务流水线。

② 条件分支

根据运行时状态动态决定下一步走向。例如,在解析意图后,根据状态决定是开始验证、直接进入并行查询还是结束。

def after_parse_router(state: TravelState) -> str:
    control = state.get("_control", {})
    status = state.get("status", "")
    if status == "collecting_info":
        return END  # 需要用户补充信息,暂停
    elif status == "planning" and not control.get("validation_completed"):
        return "validate_budget"  # 开始顺序验证
    elif status == "planning" and control.get("validation_completed"):
        return "start_parallel"   # 跳过验证,直接并行查询
    else:
        return END  # 其他情况结束

workflow.add_conditional_edges(
    "parse_intent",
    after_parse_router,
    {
        "validate_budget": "validate_budget",
        "start_parallel": "start_parallel",
        END: END
    }
)

核心要点:条件分支让工作流具备了基于状态的动态路由能力。

③ 循环执行

通过节点间的条件路由,形成循环逻辑。例如,预算优化循环最多尝试3次。

def budget_satisfaction_router(state: TravelState) -> str:
    control = state.get("_control", {})
    attempts = control.get("budget_optimization_attempts", 0)
    satisfied = control.get("budget_satisfied", False)

    if satisfied:
        return "itinerary_optimization"  # 满意则退出循环
    elif attempts >= 3:
        return "human_intervention"      # 超尝试次数,转人工
    else:
        return "budget_optimization"     # 否则继续优化

workflow.add_edge("budget_optimization", "check_budget_satisfaction")
workflow.add_conditional_edges(
    "check_budget_satisfaction",
    budget_satisfaction_router,
    {
        "budget_optimization": "budget_optimization",
        "itinerary_optimization": "itinerary_optimization",
        "human_intervention": "human_intervention"
    }
)

核心要点:循环的本质是节点通过条件分支再次指向自身或前序节点。

④ 并行执行

多个节点基于同一份State同时执行,最后在汇聚节点合并结果。例如,同时查询航班、酒店、景点。

# 从 start_parallel 同时触发3个查询
workflow.add_edge("start_parallel", "query_flights")
workflow.add_edge("start_parallel", "query_hotels")
workflow.add_edge("start_parallel", "query_attractions")
# 所有并行查询完成后,汇聚到 aggregate_results
workflow.add_edge("query_flights", "aggregate_results")
workflow.add_edge("query_hotels", "aggregate_results")
workflow.add_edge("query_attractions", "aggregate_results")

核心要点:并行提升了效率,汇聚节点负责结果合并,是实现复杂流程编排的关键。

4. 工具调用:让智能体“能动起来”

工具使AI能够执行具体操作。例如,将生成的行程保存到文件。
首先,定义一个写入文件的工具:

@tool
def write_itinerary_to_file(itinerary_content: str, filename: str = "travel_plan.md") -> str:
    """将行程内容写入Markdown文件。"""
    with open(filename, 'w', encoding='utf-8') as f:
        f.write(itinerary_content)
    return f"行程已成功保存至 {filename}"

然后,创建工具节点并将其接入工作流:

# 创建包含所有工具的工具节点
all_tools = [write_itinerary_to_file]
tool_node = ToolNode(all_tools)

# 在生成行程后,连接至工具调用节点
workflow.add_edge("generate_itinerary", "write_itinerary_file")
workflow.add_edge("write_itinerary_file", "tool_node") # 此节点准备调用参数
workflow.add_edge("tool_node", END) # 工具执行后结束

若希望LLM自主决定是否及如何调用工具,可使用bind_tools并让模型返回工具调用请求,再由ToolNode实际执行。

核心要点:工具是连接AI决策与真实世界操作的桥梁。

5. 状态持久化:实现工作流的“暂停与恢复”

通过配置检查点(Checkpointer),可以让Graph的状态持久化,从而支持长时间运行、中断恢复和流程审计。这是构建生产级云原生AI应用的重要特性。

import asyncio
from langgraph.checkpoint.sqlite import AsyncSqliteSaver

async def main():
    async with AsyncSqliteSaver.from_conn_string("travel_planning.db") as checkpointer:
        # 编译Graph时传入 checkpointer
        app = workflow.compile(checkpointer=checkpointer)
        # 运行应用,状态会自动保存至数据库
        # config 中可包含 thread_id 用于恢复特定流程
        config = {"configurable": {"thread_id": "user_123_trip_1"}}
        final_state = await app.ainvoke({"input": "我想去云南旅游5天,预算8000"}, config=config)

核心要点:状态持久化使工作流具备了状态管理和可恢复性,适合复杂、长周期的任务。

6. 人工干预:在自动化中引入“决策点”

当流程遇到无法自动处理的情况(如预算严重超支需用户确认)时,可以暂停并等待人工输入。

def node_human_intervention(state: TravelState) -> TravelState:
    """人工干预节点:预算超支时等待用户决策。"""
    cost_analysis = state.get("cost_analysis", {})
    total_cost = cost_analysis.get("total_cost", 0)
    budget = cost_analysis.get("budget", 0)
    overspend = total_cost - budget

    # 生成建议并更新状态,等待外部输入
    suggestions = [f"当前超支 {overspend:,}元,建议调整酒店等级或出行日期。"]
    return {
        **state,
        "status": "waiting_confirmation",
        "_control": {
            **state.get("_control", {}),
            "waiting_confirmation": True,
            "suggestions": suggestions
        },
        "messages": state["messages"] + [
            AIMessage(content=f"""
⚠️ 预算超支提醒:超支{overspend:,}元。
🤔 请决策:
1. 接受优化建议 (输入‘接受’)
2. 保持原方案 (输入‘保持’)
3. 终止规划 (输入‘终止’)
            """)
        ]
    }

当用户在外部提供输入(如“接受”)后,可更新state['_control']中的user_confirmed等字段,并重新触发Graph,流程将根据新状态继续执行。

核心要点:人工干预节点使全自动工作流具备了关键决策的灵活性。

总结

本文基于LangGraph 1.0,通过一个智能旅游规划助手的完整案例,系统演示了其六大核心概念与功能的实战应用。掌握State、Node、流程控制、工具调用、状态持久化与人工干预,你便能将复杂的AI业务逻辑拆解为清晰、可控的图结构工作流。这为构建可靠、可维护的智能体(Agent)与AI应用提供了坚实的实践基础。

完整代码示例已开源,可供进一步参考与实践。


(说明:为符合优化要求,已移除原文中的推广信息、互动提示、联系方式及“往期回顾”等部分,并已对部分表述进行了润色与重组,核心技术与代码逻辑保持不变。)




上一篇:12月15日安全动态:Windows LNK漏洞、VSCode恶意扩展与固件逆向分析
下一篇:Windows RasMan漏洞深度分析:权限提升与CVE-2025-59230高危攻击链
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-17 16:01 , Processed in 0.107172 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表