LangGraph正成为构建可控、可观测、可插拔AI工作流的热门框架。无论是智能客服、多步骤智能体,还是自动化运维与AI建站,都能基于它实现高效的业务编排。
然而,许多开发者在初次接触时,常因对其整体框架和运行机制缺乏系统性认知而感到无从下手。
本文旨在用最短的时间,帮助开发者建立对LangGraph的“整体认知”。我们将避开抽象概念的堆砌,通过一个极简但完整的旅游规划助手示例,直观展示LangGraph中六个核心知识点:State(状态)、Node(节点)、流程控制、工具调用、状态持久化以及人工干预的具体实现。
你将通过本文轻松掌握:
- Graph的搭建与运作机制。
- 节点、状态、工具与流程之间的关系。
- 如何编排顺序、分支、循环及并行业务流。
项目示例:智能旅游规划助手
我们以规划一个“云南5天游,预算8000元”的行程为例,其核心流程框架如下:
开始 → 解析意图 → 并行查询 → 汇总结果 → 预算评估
↓
预算优化循环(最多3次)←→ 人工干预(条件分支)
↓
生成最终行程 → 文件写入 → 结束
↓
中断恢复支持(任意节点可恢复)
流程步骤说明:
- 解析意图:调用大模型解析用户输入,提取地点、天数、预算等关键信息。若信息不全,则请求用户补充。
- 顺序验证:依次验证预算、目的地、时间及个人信息的可行性。
- 并行查询:同时查询机票、酒店、景点信息。
- 汇总结果:合并所有并行查询的结果。
- 循环优化:进行预算优化与行程优化(循环逻辑)。
- 生成行程:调用大模型生成最终的行程表。
- 保存结果:调用工具将行程保存为文件。
接下来,我们逐一拆解这些流程在LangGraph中的实现。
1. State(状态):工作流的“全局数据上下文”
State是整个工作流的“数据总线”,所有节点(Node)和工具都从中读取数据,并将处理结果写回。它本质上是一个全局共享的字典。在人工智能工作流中,保持状态的一致性至关重要。
class TravelState(TypedDict):
"""旅游规划状态定义"""
messages: Annotated[Sequence[BaseMessage], add_messages]
input: Optional[str] # 用户原始输入
travel_info: Optional[Dict[str, Any]] # 旅行基本信息:目的地、天数、预算等
query_results: Optional[Dict[str, Any]] # 合并后的查询结果
cost_analysis: Optional[Dict[str, Any]] # 成本分析
itinerary: Optional[str] # 生成的行程
status: str # 当前状态
_control: Optional[Dict[str, Any]] # 内部流程控制字段
# 并行查询结果独立字段
flight_info: Optional[Dict[str, Any]]
hotel_info: Optional[Dict[str, Any]]
attractions_info: Optional[Dict[str, Any]]
无论是查询、评估还是生成,所有数据都在State中流转。
核心要点:State是流程的“唯一真相源”,是所有节点间协作的基础。
2. Node(节点):执行具体任务的“功能单元”
Node是Graph的基本执行单元,每个Node都是一个独立的“步骤函数”,专注于单一职责。
"""创建旅游规划Graph"""
workflow = StateGraph(TravelState)
# 添加核心处理节点
workflow.add_node("parse_intent", node_parse_intent) # 解析用户意图
# 顺序执行节点链
workflow.add_node("validate_budget", node_validate_budget) # 1️⃣ 预算验证
workflow.add_node("check_destination", node_check_destination) # 2️⃣ 目的地检查
workflow.add_node("verify_travel_time", node_verify_travel_time) # 3️⃣ 时间检查
workflow.add_node("check_documents", node_check_documents) # 4️⃣ 个人信息验证
# 并行执行节点
workflow.add_node("query_flights", node_query_flights) # 查询航班
workflow.add_node("query_hotels", node_query_hotels) # 查询酒店
workflow.add_node("query_attractions", node_query_attractions) # 查询景点
workflow.add_node("aggregate_results", node_aggregate_parallel_results) # 汇总结果
# 循环执行节点
workflow.add_node("budget_optimization", node_budget_optimization) # 预算优化
workflow.add_node("check_budget_satisfaction", node_check_budget_satisfaction) # 检查预算满意度
workflow.add_node("generate_itinerary", node_generate_itinerary) # 生成最终行程
通过将流程拆分为细粒度的Node,后期的流程编排将变得非常灵活。
核心要点:Node = 单功能步骤函数。模块化是灵活编排的前提。
3. Control Flow(流程控制):决定下一步的“导航逻辑”
LangGraph支持多种流程组织方式,即控制流。
① 顺序执行
最基本的流程,节点按既定顺序依次执行。
# 顺序执行链:预算验证 → 目的地检查 → 时间验证 → 文件检查
workflow.add_edge("validate_budget", "check_destination") # 1️⃣ → 2️⃣
workflow.add_edge("check_destination", "verify_travel_time") # 2️⃣ → 3️⃣
workflow.add_edge("verify_travel_time", "check_documents") # 3️⃣ → 4️⃣
workflow.add_edge("check_documents", "start_parallel") # 4️⃣ → 并行开始节点
适用场景:有明确、固定步骤的业务流水线。
② 条件分支
根据运行时状态动态决定下一步走向。例如,在解析意图后,根据状态决定是开始验证、直接进入并行查询还是结束。
def after_parse_router(state: TravelState) -> str:
control = state.get("_control", {})
status = state.get("status", "")
if status == "collecting_info":
return END # 需要用户补充信息,暂停
elif status == "planning" and not control.get("validation_completed"):
return "validate_budget" # 开始顺序验证
elif status == "planning" and control.get("validation_completed"):
return "start_parallel" # 跳过验证,直接并行查询
else:
return END # 其他情况结束
workflow.add_conditional_edges(
"parse_intent",
after_parse_router,
{
"validate_budget": "validate_budget",
"start_parallel": "start_parallel",
END: END
}
)
核心要点:条件分支让工作流具备了基于状态的动态路由能力。
③ 循环执行
通过节点间的条件路由,形成循环逻辑。例如,预算优化循环最多尝试3次。
def budget_satisfaction_router(state: TravelState) -> str:
control = state.get("_control", {})
attempts = control.get("budget_optimization_attempts", 0)
satisfied = control.get("budget_satisfied", False)
if satisfied:
return "itinerary_optimization" # 满意则退出循环
elif attempts >= 3:
return "human_intervention" # 超尝试次数,转人工
else:
return "budget_optimization" # 否则继续优化
workflow.add_edge("budget_optimization", "check_budget_satisfaction")
workflow.add_conditional_edges(
"check_budget_satisfaction",
budget_satisfaction_router,
{
"budget_optimization": "budget_optimization",
"itinerary_optimization": "itinerary_optimization",
"human_intervention": "human_intervention"
}
)
核心要点:循环的本质是节点通过条件分支再次指向自身或前序节点。
④ 并行执行
多个节点基于同一份State同时执行,最后在汇聚节点合并结果。例如,同时查询航班、酒店、景点。
# 从 start_parallel 同时触发3个查询
workflow.add_edge("start_parallel", "query_flights")
workflow.add_edge("start_parallel", "query_hotels")
workflow.add_edge("start_parallel", "query_attractions")
# 所有并行查询完成后,汇聚到 aggregate_results
workflow.add_edge("query_flights", "aggregate_results")
workflow.add_edge("query_hotels", "aggregate_results")
workflow.add_edge("query_attractions", "aggregate_results")
核心要点:并行提升了效率,汇聚节点负责结果合并,是实现复杂流程编排的关键。
4. 工具调用:让智能体“能动起来”
工具使AI能够执行具体操作。例如,将生成的行程保存到文件。
首先,定义一个写入文件的工具:
@tool
def write_itinerary_to_file(itinerary_content: str, filename: str = "travel_plan.md") -> str:
"""将行程内容写入Markdown文件。"""
with open(filename, 'w', encoding='utf-8') as f:
f.write(itinerary_content)
return f"行程已成功保存至 {filename}"
然后,创建工具节点并将其接入工作流:
# 创建包含所有工具的工具节点
all_tools = [write_itinerary_to_file]
tool_node = ToolNode(all_tools)
# 在生成行程后,连接至工具调用节点
workflow.add_edge("generate_itinerary", "write_itinerary_file")
workflow.add_edge("write_itinerary_file", "tool_node") # 此节点准备调用参数
workflow.add_edge("tool_node", END) # 工具执行后结束
若希望LLM自主决定是否及如何调用工具,可使用bind_tools并让模型返回工具调用请求,再由ToolNode实际执行。
核心要点:工具是连接AI决策与真实世界操作的桥梁。
5. 状态持久化:实现工作流的“暂停与恢复”
通过配置检查点(Checkpointer),可以让Graph的状态持久化,从而支持长时间运行、中断恢复和流程审计。这是构建生产级云原生AI应用的重要特性。
import asyncio
from langgraph.checkpoint.sqlite import AsyncSqliteSaver
async def main():
async with AsyncSqliteSaver.from_conn_string("travel_planning.db") as checkpointer:
# 编译Graph时传入 checkpointer
app = workflow.compile(checkpointer=checkpointer)
# 运行应用,状态会自动保存至数据库
# config 中可包含 thread_id 用于恢复特定流程
config = {"configurable": {"thread_id": "user_123_trip_1"}}
final_state = await app.ainvoke({"input": "我想去云南旅游5天,预算8000"}, config=config)
核心要点:状态持久化使工作流具备了状态管理和可恢复性,适合复杂、长周期的任务。
6. 人工干预:在自动化中引入“决策点”
当流程遇到无法自动处理的情况(如预算严重超支需用户确认)时,可以暂停并等待人工输入。
def node_human_intervention(state: TravelState) -> TravelState:
"""人工干预节点:预算超支时等待用户决策。"""
cost_analysis = state.get("cost_analysis", {})
total_cost = cost_analysis.get("total_cost", 0)
budget = cost_analysis.get("budget", 0)
overspend = total_cost - budget
# 生成建议并更新状态,等待外部输入
suggestions = [f"当前超支 {overspend:,}元,建议调整酒店等级或出行日期。"]
return {
**state,
"status": "waiting_confirmation",
"_control": {
**state.get("_control", {}),
"waiting_confirmation": True,
"suggestions": suggestions
},
"messages": state["messages"] + [
AIMessage(content=f"""
⚠️ 预算超支提醒:超支{overspend:,}元。
🤔 请决策:
1. 接受优化建议 (输入‘接受’)
2. 保持原方案 (输入‘保持’)
3. 终止规划 (输入‘终止’)
""")
]
}
当用户在外部提供输入(如“接受”)后,可更新state['_control']中的user_confirmed等字段,并重新触发Graph,流程将根据新状态继续执行。
核心要点:人工干预节点使全自动工作流具备了关键决策的灵活性。
总结
本文基于LangGraph 1.0,通过一个智能旅游规划助手的完整案例,系统演示了其六大核心概念与功能的实战应用。掌握State、Node、流程控制、工具调用、状态持久化与人工干预,你便能将复杂的AI业务逻辑拆解为清晰、可控的图结构工作流。这为构建可靠、可维护的智能体(Agent)与AI应用提供了坚实的实践基础。
完整代码示例已开源,可供进一步参考与实践。
(说明:为符合优化要求,已移除原文中的推广信息、互动提示、联系方式及“往期回顾”等部分,并已对部分表述进行了润色与重组,核心技术与代码逻辑保持不变。)