随着大语言模型的快速发展,构建具备自主决策与执行能力的智能体已成为应用开发的核心。然而,单一智能体在处理需要多步骤协作、涉及多个专业领域的复杂任务时,常常显得力不从心。为了应对这一挑战,多智能体系统应运而生。它将复杂的整体任务分解为多个子任务,由不同的专业智能体协同处理,最终合并结果,极大地扩展了大模型的应用边界。LangGraph 作为 LangChain 生态系统的重要扩展,通过引入有向图模型来重构智能体的工作流,将复杂的交互与决策过程模块化,是目前主流的多智能体集成框架之一。
1. LangGraph概述
1.1 什么是LangGraph
LangGraph 是专为构建基于大模型的、有状态、多智能体复杂应用而设计的框架。其核心思想是将应用的工作流程抽象为一个有向图结构,通过节点(Node)和边(Edge)来定义任务的执行步骤与逻辑流向。相比传统的线性链式调用,LangGraph 支持条件分支、循环、并行等复杂控制流,能够实现状态持久化、断点续跑、时间旅行、人机协作等高级功能,并提供了多智能体协作、层级架构等多种成熟模式。该框架已成功应用于智能客服、自动化运维、研究型Agent等场景,展现出卓越的适应性和扩展性。
LangGraph在Github上的热度变化

1.2 为什么使用LangGraph
相比传统模式,LangGraph 具备显著优势:
- 强大的状态管理:提供集中式的状态管理机制,允许Agent在不同节点间传递和维护上下文信息(如长期记忆、多轮对话历史),避免了状态分散的问题,提高了系统的可维护性和可观测性。
- 灵活的执行控制:通过定义节点和边,可以精确控制Agent的执行逻辑,包括条件分支、循环和并行执行等,实现远超线性流程的复杂业务逻辑。
- 无缝的工具集成:能够轻松集成各种外部工具(搜索引擎、数据库、API等),使Agent能够获取实时信息并执行特定操作,极大扩展了LLM的能力边界。
- 卓越的可观测性与可调试性:图结构使得Agent的运行路径清晰可见,便于理解其决策过程,并在出现问题时快速定位和调试。
- 高度的模块化与可复用性:每个节点都是一个独立的、可复用的组件。通过子图机制,复杂工作流可被分解为多个可独立开发和测试的模块,提升开发和测试效率。
1.3 安装与使用
通过pip安装LangGraph:
pip install -U langgraph
使用LangGraph创建一个简单的ReAct Agent:
def get_weather(city: str) -> str:
"""获取指定城市的天气信息。"""
return f"今天{city}是晴天"
# 创建模型
model = ChatOpenAI(
model_name=model_name,
base_url=base_url,
api_key=api_key
)
# 使用LangGraph提供的API创建Agent
agent = create_react_agent(
model=model, # 添加模型
tools=[get_weather], # 添加工具
prompt="你是一个天气助手"
)
human_message = HumanMessage(content="今天深圳天气怎么样?")
response = agent.invoke(
{"messages": [human_message]}
)
print(response)

运行模式:Agent支持同步(.invoke() 或 .stream())和异步(await .ainvoke() 或 async for 配合 .astream())两种执行模式。
最大迭代次数:为避免Agent陷入无限循环,可以设置递归限制。
response = agent.invoke(
{"messages": [{"role": "user", "content": "预定一个深圳到北京的机票"}]},
{"recursion_limit": 10} # 指定最大迭代次数
)
2. LangGraph核心
2.1 Graph(图)

图是由节点和边组成的、用于描述节点间关系的数据结构。LangGraph使用有向图来定义AI工作流中的执行步骤和顺序,从而实现复杂、有状态、可循环的应用程序逻辑。
2.2 LangGraph核心要素(State, Edge, Node)
1. State(状态)
State是贯穿整个工作流执行过程的共享数据结构,代表了当前系统的快照。它存储了从工作流开始到当前节点的所有必要信息(如历史对话、检索到的文档、工具执行结果等),在各个节点间共享,且每个节点都可以修改它。State可以是TypedDict类型,也可以是pydantic中的BaseModel类型。
# 定义状态
class GraphState(TypedDict):
process_data: dict # 默认更新策略为替换(后续会讲更新策略)
# 创建一个状态图,并指定状态
graph = StateGraph(GraphState)
2. Node(节点)
Node是工作流中的一个基本处理单元,代表一个操作步骤。它可以是一个Agent、一次大模型调用、一个工具或一个自定义函数。节点设计应遵循单一职责、无状态、幂等和可测试的原则。
# 定义一个节点,入参为state
def input_node(state: GraphState) -> GraphState:
print(state)
return {"process_data": {"input": "input_value"}}
# 定义带参数的node节点
def process_node(state: dict, param1: int, param2: str) -> dict:
print(state, param1, param2)
return {"process_data": {"process": "process_value"}}
graph = StateGraph(GraphState)
# 添加input节点
graph.add_node("input", input_node)
# 给process_node节点绑定参数
process_with_params = partial(process_node, param1=100, param2="test")
# 添加带参数的node节点
graph.add_node("process", process_with_params)
特殊节点:__START__(开始节点)和__END__(结束节点)是两个特殊节点,用于定义工作流的起点和终点。可以通过graph.add_edge(START, "node_start")或graph.set_entry_point("node_start")设置起点;通过graph.add_edge("node_a", END)或graph.set_finish_point("node_end")设置终点。
错误处理和重试机制:
LangGraph提供了重试机制来保证系统可靠性。
# 重试策略
retry_policy = RetryPolicy(
max_attempts=3, # 最大重试次数
initial_interval=1, # 初始间隔
jitter=True, # 抖动(添加随机性避免重试风暴)
backoff_factor=2, # 退避乘数(每次重试间隔时间的增长倍数)
retry_on=[RequestException, Timeout] # 只重试这些异常
)
graph.add_node("process", process_node, retry=retry_policy)
节点缓存:
LangGraph支持根据节点输入进行缓存,以加快响应速度。缓存键由key_func根据输入数据生成,可通过ttl参数控制有效期。
class State(TypedDict):
x: int
result: int
builder = StateGraph(State)
def expensive_node(state: State) -> dict[str, int]:
# 模拟耗时操作
time.sleep(2)
return {"result": state["x"] * 2}
# 添加节点,并指定缓存策略(TTL为3秒)
builder.add_node("expensive_node", expensive_node, cache_policy=CachePolicy(ttl=3))
builder.set_entry_point("expensive_node")
builder.set_finish_point("expensive_node")
graph = builder.compile(cache=InMemoryCache())
3. Edge(边)
Edge定义了节点之间的连接关系、执行顺序以及通信方式。一个节点可以有多个出边,多个节点也可以指向同一个节点(如Map-Reduce模式)。
# 添加固定边,执行顺序:start -> input -> process -> output -> end
graph.add_edge(START, "input")
graph.add_edge("input", "process")
graph.add_edge("process", "output")
graph.add_edge("output", END)
4. 构建一个完整的图
图的构建遵循标准流程:初始化StateGraph -> 添加节点 -> 定义边 -> (可选)设置入口/出口 -> 编译 -> 执行。
# 定义状态
class GraphState(TypedDict):
process_data: dict
def input_node(state: GraphState) -> GraphState:
print(state)
return {"process_data": {"input": "input_value"}}
def output_node(state: GraphState) -> GraphState:
print(state)
return {"process_data": {"output": "output_value"}}
def process_node(state: dict) -> dict:
print(state)
return {"process_data": {"process": "process_value"}}
# 创建一个状态图,并指定状态
graph = StateGraph(GraphState)
# 添加input、process、output节点
graph.add_node("input", input_node)
graph.add_node("process", process_node)
graph.add_node("output", output_node)
# 添加固定边,定义执行顺序
graph.add_edge(START, "input")
graph.add_edge("input", "process")
graph.add_edge("process", "output")
graph.add_edge("output", END)
# 编译图,编译器会检查图结构的正确性
app = graph.compile()
# 执行工作流
app.invoke({})

2.3 状态合并策略(Reducers)
Reducer定义了多个节点之间State的更新规则(如覆盖、合并、追加等)。
1. 直接覆盖:如果没有为状态字段指定 Reducer,默认会覆盖更新。即后执行节点的返回值会直接覆盖先执行节点的值。
class OverrideState(TypedDict):
process_data : dict # 未指定合并策略,默认覆盖
2. Annotated:使用类型注解指定内置合并策略(如add用于数字相加、列表合并、字符串拼接)。
class AddState(TypedDict):
data_int: Annotated[int, add] # 数字相加
data_list: Annotated[list, add] # 合并两个列表
data_str: Annotated[str, add] # 字符串拼接
def add_node1(state: AddState) -> AddState:
print(state)
return {"data_int": 1, "data_list": [1], "data_str": "hello "}
def add_node2(state: AddState) -> AddState:
print(state)
return {"data_int": 2, "data_list": [2], "data_str": "world"}
3. 内置Reducer:add_messages(消息列表合并)
这是LangGraph提供的专用Reducer,用于智能合并消息列表,不只是简单的追加。它能处理消息的追加、旧消息的覆盖(用于更新工具调用的中间结果)以及自动类型转换。
class MessageState(TypedDict):
# 消息列表,使用add_messages合并消息列表
messages: Annotated[list, add_messages]
def system_node(state: MessageState) -> dict:
return {"messages": [SystemMessage(content="你是一个精通LangGraph的专家工程师.")]}
def user_input_node(state: MessageState) -> dict:
return {"messages": [HumanMessage(content="什么是LangGraph?")]}
def ai_response_node(state: MessageState) -> dict:
return {"messages": [AIMessage(content="LangGraph是一个...")]}
def tool_node(state: MessageState) -> dict:
return {"messages": [ToolMessage(content="工具调用参数params1", tool_call_id="tool_call_id")]}
4. 自定义Reducer:实现自定义合并逻辑。
def merge_dict_reducer(source: dict, new: dict) -> dict:
# 自定义合并逻辑
result = source.copy()
result.update(new)
return result
def max_reducer(source: int, new: int) -> int:
# 自定义合并逻辑
return max(source, new)
class CustomReducerState(TypedDict):
# 使用自定义Reducer的状态
max_score: Annotated[int, max_reducer] # 保留最大值
metadata: Annotated[dict, merge_dict_reducer] # 字典合并
2.4 条件边(Conditional Edge)
在实际应用中,工作流的下一个节点可能并非固定,需要根据当前执行状态动态决定。条件边通过一个路由函数,根据状态返回值来选择具体要执行的节点。

def route_by_sentiment(state: GraphState) -> str:
# 路由逻辑...返回最终的条件标签
return "condition_1"
graph = StateGraph(GraphState)
graph.add_node("node1", node1)
graph.add_node("node2", node2)
graph.add_node("node3", node3)
# 添加条件边:参数为(当前节点, 路由函数, {条件标签: 目标节点})
graph.add_conditional_edges(
START,
route_by_sentiment,
{
"condition_1": "node1",
"condition_2": "node2",
"condition_3": "node3"
}
)
# 所有处理节点都连接到END
graph.add_edge("node1", END)
graph.add_edge("node2", END)
graph.add_edge("node3", END)
app = graph.compile()
图可视化:LangGraph 支持将图结构可视化为图片,便于检查和验证工作流设计。
png_data = app.get_graph().draw_mermaid_png()
with open("graph.png", "wb") as f:
f.write(png_data)

2.5 Send 和 Command
Send和Command是两种用于实现高级工作流控制的机制,支持动态决定下一步执行哪个节点。
1. Send:用于动态创建多个执行分支,实现并行处理。每个Send对象指定一个目标节点和传递参数,LangGraph会并行执行这些任务。适用于Map-Reduce等场景。

def route_tasks(state: MapReduceState) -> list[Send]:
# 为每个任务创建一个Send对象
sends = []
for idx, task in enumerate(state['tasks']):
# 创建Send任务及相应的参数
send = Send("process_task",{"task_id": idx,"task_name": task})
sends.append(send)
# 返回所有的目标节点
return sends
# 路由函数,返回 Send 列表
graph.add_conditional_edges("generate_tasks", route_tasks)
# 所有process_task完成后,汇总结果
graph.add_edge("process_task", "reduce_results")
2. Command:不仅可以指定下一个节点,还支持更新状态、处理中断恢复以及在嵌套图之间导航。常用于复杂的人机交互和多智能体协同中的执行权交接(handoffs)。

# 在节点函数中返回 Command 来实现动态路由
def agent_node(state: State) -> Command:
if need_help(state):
# 决定将任务移交给另一个node,并更新状态
return Command(
goto="expert_agent",
update={"messages": state["messages"] + [new_message]}
)
else:
return Command(goto="END")
Command与条件边的主要区别在于:条件边仅路由到下一个节点,而Command在路由的同时还能更新状态。当需要同时更新状态并路由到不同节点时,应使用Command。
2.6 状态持久化

状态持久化指在程序运行时将瞬间状态保存下来,以便后续恢复执行,防止因程序退出或重启而导致任务丢失。在LangGraph中,如果启用了持久化,工作流每个步骤结束后,系统会自动将当前整个图的状态完整保存为一个检查点(Checkpoint)。检查点通过thread_id(会话ID)区分不同的会话,支持存储在内存、Redis、数据库等介质中。
memory = MemorySaver()
app = graph.compile(checkpointer=memory) # 使用内存保存检查点
config = {"configurable": {"thread_id": "recovery_thread"}} # 必须配置会话ID
result = app.invoke({"value": 5, "operations": []}, config=config)
# 获取所有的检查点
checkpoints = list(app.get_state_history(config))
# 恢复:从指定检查点继续执行
recovery_config = checkpoints[2].config
recovered_result = app.invoke(None, config=recovery_config)
检查点由StateSnapshot对象表示,包含config(配置)、values(状态值)、next(下一步节点)、metadata(元数据)等关键属性。
2.7 时间旅行
如果启用了状态持久化,LangGraph会在每个节点执行后保存完整状态。时间旅行功能允许回溯、检查和修改工作流执行过程中的历史状态,并从某个历史节点重新执行。这对于调试、审计和路径探索(尝试不同决策分支)非常有用。
checkpoints = list(app.get_state_history(config))
# 查看所有的步骤(注意:checkpoints列表顺序与执行顺序相反)
for i, checkpoint in enumerate(checkpoints):
print(f"步骤 {i}: 下一节点 {checkpoint.next}, 状态值 {checkpoint.values}")
# 获取一个检查点
checkpoint = checkpoints[2]
# 更新状态(这会替换整个data_list)
app.update_state(
checkpoint.config,
{"data_list": ["updated_value"]} # 完全替换状态
)
# 从更新后的检查点继续执行
result = app.invoke(None, config=checkpoint.config)

2.8 人机协作(Human-in-the-Loop)

人机协作(HIL)通过在关键节点引入人工干预,弥补AI的“能力盲区”和人类的“效率瓶颈”,在保证处理速度的同时,提升结果的准确性、安全性和适用性。LangGraph通过中断机制、状态持久化和恢复执行机制,在Agent自动化工作流中无缝嵌入人工干预。
def human_feedback_node(state: HumanInLoopState) -> dict:
# 定义中断信息
interrupt_data = {
"type": "human_review",
"request": state['request'],
"analysis": state['analysis'],
"prompt": "请输入: 同意 / 拒绝"
}
# 使用interrupt()函数暂停工作流,等待人工输入
human_response = interrupt(interrupt_data)
print(f"收到用户输入: {human_response}")
# 解析人工输入,其他业务逻辑...
return {
"human_feedback": human_response.get("feedback"),
"approved": human_response.get("decision"),
"messages": [f"人工反馈: {human_response.get('feedback')}"]
}
# 添加人工反馈节点
graph.add_node("human_feedback", human_feedback_node)
graph.add_edge("analyze", "human_feedback")
# 添加条件边,根据用户反馈选择后续节点
builder.add_conditional_edges(
"human_feedback",
route_by_human_decision,
{
"process_approval": "process_approval",
"process_rejection": "process_rejection"
}
)
memory = MemorySaver()
app = graph.compile(checkpointer=memory)
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# 首次执行,执行到human_feedback节点会中断,invoke立即返回包含中断信息的结果
result = graph.invoke(initial_input, config)
# 模拟人工输入
human_decision = {
"decision": "同意",
"feedback": "用户反馈"
}
# 重新恢复工作流,继续执行后续节点
resume_command = Command(resume=human_decision)
final_result = graph.invoke(resume_command, config)

注意:调用interrupt()函数后,当次的invoke调用会正常结束,并将包含中断信息的结果返回给调用方。等重新调用graph.invoke(resume_command, config)时,会从调用interrupt()的入口处重新执行(interrupt之前的逻辑会被重复执行),且interrupt()的返回值即为用户输入的值。

2.9 记忆

记忆是智能体记住先前交互信息以实现连贯对话的核心能力。LangGraph提供了短期记忆和长期记忆。
1. 短期记忆:存储当前对话上下文信息,作用于单次会话或线程,通过thread_id区分。通过图状态(State)和检查点(Checkpoint)实现。
# 1. 初始化一个内存检查点
checkpointer = InMemorySaver()
# 2. 在编译图时传入检查点
graph = builder.compile(checkpointer=checkpointer)
# 3. 调用时通过 thread_id 指定会话线程
config = {"configurable": {"thread_id": "thread_123"}}
result = graph.invoke({"messages": [{"role": "user", "content": "你好"}]}, config=config)
2. 长期记忆:用于存储需要在不同会话间保留的信息。通过存储库(Store)接口实现,类似于一个键值数据库,并支持基于向量嵌入的语义检索。长期记忆保存在自定义的“命名空间”中。
def write_node(state: dict) -> dict:
# 获取全局存储实例
store = get_store()
# 存储数据到指定命名空间
store.put(namespace, "user_123", {"name": "张三", "age": "20"})
return {}
def read_node(state: dict) -> dict:
# 获取全局存储实例
store = get_store()
# 根据键获取指定用户数据
user_info = store.get(namespace, "user_123")
print(user_info)
# 在命名空间中搜索包含"张三"的数据
user_info = store.search(namespace, query="张三", limit=10)
print(user_info)
return {}
# 初始化一个内存存储
store = InMemoryStore()
# 定义命名空间,用于数据分类和隔离
namespace = ("users", "profile")
# 创建图并指定存储
app = graph.compile(store=store)
app.invoke({})

2.10 子图
LangGraph允许将一个完整的图作为另一个图的节点,这适用于将复杂任务拆解为多个专业智能体协同完成。每个子图都可以独立开发、测试和复用,并可以拥有自己的私有数据或与父图共享数据。

# 定义父图状态
class ParentState(TypedDict):
parent_messages: list # 与子图共享数据
# 定义子图状态
class SubgraphState(TypedDict):
parent_messages: list # 与父图共享的数据
sub_message: str # 子图私有数据
# 创建子图
sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("sub_node", subgraph_node)
sub_builder.add_edge(START, "sub_node")
compiled_subgraph = sub_builder.compile()
# 创建父图
builder = StateGraph(ParentState)
# 添加子图作为父图的节点
builder.add_node("subgraph_node", compiled_subgraph)
builder.add_edge("parent_node", "subgraph_node")
# 编译父图并执行
parent_graph = builder.compile()
parent_graph.invoke({"messages": ["init message"]})
共享数据规则:如果父图状态与子图状态中定义的字段名相同,则状态是共享的。
如果父子图状态结构不同,则需要在父图中创建一个代理节点函数,手动调用子图并处理状态转换。
# 在父图中创建代理节点处理状态转换
def call_subgraph(state: ParentState):
# 将父图状态转换为子图的输入
subgraph_input = {"analysis_input": state["user_query"]}
# 调用子图
subgraph_response = compiled_subgraph.invoke(subgraph_input)
# 将子图的输出映射回父图状态
return {"final_answer": subgraph_response["analysis_result"]}
builder = StateGraph(ParentState)
# 父图中添加的是代理节点,而不是直接添加子图
builder.add_node("call_subgraph_node", call_subgraph)
builder.add_edge(START, "call_subgraph_node")
parent_graph = builder.compile()
2.11 集成MCP
模型上下文协议(MCP)是一个开放协议,标准化了应用程序如何向大模型提供工具和上下文。通过 langchain-mcp-adapters 库,LangGraph中的Agent可以使用在MCP服务器上定义的工具。
# 安装
pip install langchain-mcp-adapters


1. 自定义MCP工具
# 创建名为"MCP_Tools"的MCP服务器
mcp = FastMCP("MCP_Tools")
@mcp.tool()
def get_weather(location: str) -> str:
"""获取指定位置的天气信息"""
return "晴天"
@mcp.tool()
def get_time() -> str:
"""获取当前时间"""
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
@mcp.tool()
def add(a: int, b: int) -> int:
"""对两个整数相加"""
return a + b
@mcp.tool()
def multiply(a: int, b: int) -> int:
"""对两个整数相乘"""
return a * b
@mcp.tool()
def subtract(a: int, b: int) -> int:
"""对两个整数相减"""
return a - b
if __name__ == "__main__":
# 使用HTTP协议传输
mcp.run(transport="streamable-http")
2. 创建ReAct类型的Agent
async def get_agent():
# 初始化MCP客户端,可以连接多个服务器
client = MultiServerMCPClient(
{
"weather": {
"url": "http://localhost:8000/mcp",
"transport": "streamable_http",
}
}
)
# 获取所有可用的工具
tools = await client.get_tools()
print(f"已加载工具: {[tool.name for tool in tools]}")
# 初始化聊天模型
model = ChatOpenAI(
model_name=model_name,
base_url=base_url,
api_key=api_key
)
# 创建React智能体
return create_react_agent(model, tools)
3. 构建测试用例
async def test_agent():
# 获取智能体
agent = await get_agent()
# 测试用例
test_cases = [
"计算 (15 + 7) × 3 等于多少?",
"先计算 20 减 8,然后乘以 2 是多少?",
"现在几点了?深圳的天气如何?"
]
for i, question in enumerate(test_cases, 1):
print(f"\n{'=' * 50}")
print(f"测试 {i}: {question}")
print(f"{'=' * 50}")
# 调用智能体
response = await agent.ainvoke(
{"messages": [HumanMessage(content=question)]}
)
# 获取最后一条消息(智能体的回复)
last_message = response["messages"][-1]
print(f"智能体回复: {last_message.content}")
if __name__ == "__main__":
asyncio.run(test_agent())
运行结果:

2.12 运行时上下文
创建图时可以指定运行时上下文,将不属于图状态的信息(如模型名称、数据库连接等)传递给节点使用。
@dataclass
class ContextSchema: # 定义上下文schema
llm_provider: str = "openai"
def node_a(state: State, runtime: Runtime[ContextSchema]):
# 获取上下文信息
llm = get_llm(runtime.context.llm_provider)
return state
graph = StateGraph(State, context_schema=ContextSchema)
# 执行时指定上下文信息
graph.invoke(inputs, context={"llm_provider": "DeepSeek-R1-Online-0120"})
2.13 递归限制
递归限制指图在单次执行过程中的最大步数,由recursion_limit参数控制,默认值为25步。一旦超过限制,会抛出GraphRecursionError错误,用于防止工作流陷入死循环,确保系统稳定性和可预测性。
try:
result = graph.invoke(
{"input": "开始执行"},
config={"recursion_limit": 50} # 设置递归限制为50次
)
except GraphRecursionError:
print("执行步数超过限制,抛出异常")
# 异常处理...
3. Multi-Agent架构
3.1 多智能体架构概述
随着业务迭代,单智能体应用的复杂度可能急剧上升,导致维护和扩展困难。常见问题包括:工具过多导致决策错误、上下文过于复杂难以跟踪、需要多个专业领域智能体协同。多智能体系统通过将应用分解为多个更小、独立的智能体来解决这些问题,其优势在于:
- 模块化:独立的智能体使得开发、测试和维护更加容易。
- 专业化:可以创建专注于特定领域的专家智能体,提高系统整体性能。
- 可控性:可以明确控制智能体之间的通信方式。
多智能体架构:

常见的智能体连接方式包括:
- Network(网络):每个智能体都可以与其他任何智能体直接通信。
- Supervisor(主管):一个中央主管智能体协调所有子智能体,决定任务委派。
- Supervisor as tools(主管作为工具调用):主管架构的特例,单个智能体被表示为工具,主管智能体通过工具调用来委派任务。
- Hierarchical(层级式):多层Supervisor架构,类似于公司组织架构。
- Custom(自定义):利用LangGraph灵活的图结构和条件边,自定义各种执行流,最为灵活。
3.2 Agent之间通信和状态管理
构建多智能体应用时,需考虑智能体间的交互方式和数据共享策略。

1. 通信模式:常见的有移交(handoffs)和工具调用。
- 移交:一个智能体将其执行上下文和执行权传递给另一个智能体,适用于自主协作场景。
- 工具调用:一个智能体(如主管)将另一个智能体作为工具进行调用,提供了更明确的层级控制和接口约束。
2. 消息传递策略:

- 共享完整推理数据:智能体将所有中间步骤写入共享通道,有利于其他智能体理解其推理过程,提升协作能力,但可能导致状态空间快速膨胀。
- 仅共享最终结果:智能体在私有空间内完成计算,仅将最终结果写入共享区,能有效控制状态复杂度,但需要为每个智能体定义独立的状态模式。
3.3 Supervisor(主管)架构
每个子智能体由一个中央主管智能体协调。主管控制所有通信流和任务委派,根据当前上下文和任务需求决定调用哪个智能体。Supervisor架构模仿了企业中“项目经理”的角色,采用经典的“管理者-工作者”结构。

安装Supervisor库:
pip install langgraph-supervisor

示例:航班与酒店预订系统
def book_hotel(hotel_name: str):
"""预订酒店"""
print(f"✅ 成功预订了 {hotel_name} 的住宿")
return f"成功预订了 {hotel_name} 的住宿。"
def book_flight(from_airport: str, to_airport: str):
"""预订航班"""
print(f"✅ 成功预订了从 {from_airport} 到 {to_airport} 的航班")
return f"成功预订了从 {from_airport} 到 {to_airport} 的航班。"
# 创建航班预订助手
flight_assistant = create_react_agent(
model=model,
tools=[book_flight],
prompt="你是专业的航班预订助手...",
name="flight_assistant"
)
# 创建酒店预订助手
hotel_assistant = create_react_agent(
model=model,
tools=[book_hotel],
prompt="你是专业的酒店预订助手...",
name="hotel_assistant"
)
# 创建主管智能体
supervisor = create_supervisor(
agents=[flight_assistant, hotel_assistant],
model=model,
prompt="你是一个智能任务调度主管..."
).compile()
# 执行任务
for chunk in supervisor.stream(
{
"messages": [
{
"role": "user",
"content": "帮我预定一个北京到深圳的机票,并且预定一个酒店"
}
]
}
):
print(chunk)
print("\n")

输出模式:supervisor支持将每个工作Agent的全部消息或最后一条消息添加到整个消息列表中。
# 添加所有消息
workflow = create_supervisor(
agents=[agent1, agent2],
output_mode="full_history")
# 添加最后一条消息
workflow = create_supervisor(
agents=[agent1, agent2],
output_mode="last_message")

层级式Supervisor:每个主管Agent也可以是一个工作Agent,由更顶层的主管管理。
research_team = create_supervisor(
[research_agent, math_agent],
model=model,
supervisor_name="research_supervisor").compile(name="research_team")
writing_team = create_supervisor(
[writing_agent, publishing_agent],
model=model,
supervisor_name="writing_supervisor").compile(name="writing_team")
top_level_supervisor = create_supervisor(
[research_team, writing_team],
model=model,
supervisor_name="top_level_supervisor").compile(name="top_level_supervisor")
添加记忆:
# 短期记忆
checkpointer = InMemorySaver()
# 长期记忆
store = InMemoryStore()
swarm = create_supervisor(
agents=[flight_assistant, hotel_assistant],
model=model,
).compile(checkpointer=checkpointer, store=store)
3.4 Swarm(群组)架构
智能体根据各自的专长动态地将控制权移交给其他智能体。Swarm架构更像一个开放的“专家社区”,没有中心指挥,每个专业智能体都具备自主判断能力,可以根据当前任务上下文决定是否及将控制权“移交”给另一个智能体,形成自然的协作流水线。

安装Swarm库:
pip install langgraph-swarm

示例:航班与酒店预订Swarm
def book_hotel(hotel_name: str):
"""预订酒店"""
print(f"✅ 成功预订了 {hotel_name} 的住宿")
return f"成功预订了 {hotel_name} 的住宿。"
def book_flight(from_airport: str, to_airport: str):
"""预订航班"""
print(f"✅ 成功预订了从 {from_airport} 到 {to_airport} 的航班")
return f"成功预订了从 {from_airport} 到 {to_airport} 的航班。"
# 创建移交工具
transfer_to_hotel_assistant = create_handoff_tool(
agent_name="hotel_assistant",
description="将用户转接给酒店预订助手。当用户需要预订酒店时使用此工具。",
)
transfer_to_flight_assistant = create_handoff_tool(
agent_name="flight_assistant",
description="将用户转接给航班预订助手。当用户需要预订航班时使用此工具。",
)
# 创建航班助手(具备移交能力)
flight_assistant = create_react_agent(
model=flight_assistant_model,
tools=[book_flight, transfer_to_hotel_assistant],
prompt="你是一个航班预订助手...",
name="flight_assistant"
)
# 创建酒店助手(具备移交能力)
hotel_assistant = create_react_agent(
model=hotel_assistant_model,
tools=[book_hotel, transfer_to_flight_assistant],
prompt="你是一个酒店预订助手...",
name="hotel_assistant"
)
# 创建Swarm,并指定默认启动的智能体
swarm = create_swarm(
agents=[flight_assistant, hotel_assistant],
default_active_agent="flight_assistant"
).compile()
# 执行任务
for chunk in swarm.stream(
{
"messages": [
HumanMessage(content="帮我预订从北京到上海的航班,并预订如家酒店")
]
}
):
print(chunk)
print("\n")

添加记忆:
# 短期记忆
checkpointer = InMemorySaver()
# 长期记忆
store = InMemoryStore()
swarm = create_swarm(
agents=[flight_assistant, hotel_assistant],
default_active_agent="flight_assistant"
).compile(checkpointer=checkpointer, store=store)
Supervisor 与 Swarm 的选择:
- Supervisor:通过集中控制带来可预测性和可靠性,适用于流程严谨、需要严格管控的核心业务。
- Swarm:通过去中心化设计带来灵活性和韧性,适用于探索性强、需要动态协作的非核心环节。
在实际复杂系统中,可以混合使用两种模式。
3.5 Handoffs(交接)
Handoffs指一个智能体将控制权交接给另一个智能体,Supervisor和Swarm都默认使用了create_handoff_tool移交工具。我们也可以自定义交接函数,其核心要素是:目的地(下一个智能体)和传递的State(信息)。
def create_task_description_handoff_tool(*, agent_name: str, description: str | None = None):
name = f"transfer_to_{agent_name}"
description = description or f"移交给 {agent_name}"
@tool(name, description=description)
def handoff_tool(
task_description: Annotated[str, "描述下一个Agent应该做什么,包括所有相关信息。"],
state: Annotated[MessagesState, InjectedState],
) -> Command:
task_description_message = {"role": "user", "content": task_description}
agent_input = {**state, "messages": [task_description_message]}
return Command(
goto=[Send(agent_name, agent_input)],
graph=Command.PARENT,
)
return handoff_tool
# 使用自定义移交工具构建多智能体图
transfer_to_hotel_assistant = create_task_description_handoff_tool(
agent_name="hotel_assistant",
description="将执行权移交给酒店预订助手",
)
# ... 创建智能体,构建图并执行

注意:上述Supervisor、Swarm和自定义Handoffs的例子在实际测试中,运行稳定性可能受模型和提示词影响,有时并非完全按照预期执行。可以通过更换模型或精细调整提示词来尝试解决。
4. JAVA版本介绍(LangChain4J和LangGraph4J)
LangGraph除了Python和JS版本,还提供了Java版本(LangGraph4J)。对于使用Java技术栈的团队或需要开发复杂业务系统的场景,LangGraph4J是一个不错的选择。结合LangChain4J可以快速将AI大模型能力引入Java项目。由于Spring AI对Spring Boot 3.x和JDK 21有要求,而LangGraph4J是一个独立的库,仅需JDK 17,引入成本更低。
本章主要介绍LangGraph4J的基本使用,其核心概念与Python版本一致,可参考上文。
4.1 环境准备
Maven依赖:
<!-- LangChain4J -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j</artifactId>
<version>1.6.0</version>
</dependency>
<!-- LangChain4j OpenAI -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-open-ai</artifactId>
<version>1.2.0</version>
</dependency>
<!-- LangGraph4J -->
<dependency>
<groupId>org.bsc.langgraph4j</groupId>
<artifactId>langgraph4j-core</artifactId>
<version>1.5.2</version>
</dependency>
4.2 使用LangChain4J集成大模型
1. 调用大模型
public static void main(String[] args) {
// 构建聊天模型实例
ChatModel chatModel = OpenAiChatModel.builder()
.baseUrl(BASE_URL) // 设置 API 基础地址
.apiKey(API_KEY) // 设置 API 密钥
.modelName("hunyuan-turbo") // 指定模型名称
.timeout(Duration.ofSeconds(60))// 设置请求超时时间为 60 秒
.logRequests(true) // 开启请求日志,便于调试
.logResponses(true) // 开启响应日志,便于调试
.maxRetries(3) // 设置最大重试次数为 3 次
.temperature(0.8) // 设置温度参数(0.0-1.0),控制输出的随机性
.returnThinking(true) // 返回模型的思考过程(针对深思考模型)
.build();
// 创建系统消息和用户消息
SystemMessage systemMessage = SystemMessage.from("你是一个LangChain和LangGraph专家,用于解答开发者的问题。");
UserMessage userMessage = UserMessage.from("介绍一下LangGraph");
// 模型调用
ChatResponse chatResponse = chatModel.chat(systemMessage, userMessage);
AiMessage aiMessage = chatResponse.aiMessage();
System.out.println(aiMessage.thinking()); // 输出思考过程
System.out.println(aiMessage.text()); // 输出最终回答
}
2. 提示词模板
// 创建提示词模板
PromptTemplate promptTemplate = PromptTemplate.from("你是一个{{domain}}领域的专家,用于解答关于{{question}}的开发者问题。");
// 填充参数
String prompt = promptTemplate.apply(Map.of(
"domain", "LangChain和LangGraph",
"question", "LangGraph"
)).text();
chatModel.chat(prompt);
3. AI Service
AI Service是LangChain4j中一个声明式的高级API,允许像定义普通Java Service接口一样定义AI功能。
// 定义一个反洗钱助手接口
interface RiskAssistant {
@SystemMessage("你是一个专注于反洗钱业务的专家助手")
@UserMessage("请回答用户关于反洗钱的提问,问题:{{question}}")
String answer(@V("question") String question);
}
public static void main(String[] args) {
// 构建聊天模型实例
ChatModel chatModel = OpenAiChatModel.builder().baseUrl(BASE_URL)
.apiKey(API_KEY)
.modelName("hunyuan-turbo")
.build();
// 通过 AiServices 创建实例
RiskAssistant riskAssistant = AiServices.create(RiskAssistant.class, chatModel);
String answer = riskAssistant.answer("什么是EDD?");
System.out.println(answer);
}
4. 添加记忆
RiskAssistant riskAssistant = AiServices.builder(RiskAssistant.class)
.chatModel(chatModel)
// 添加记忆能力,保存用户最近 10 条对话
.chatMemory(MessageWindowChatMemory.withMaxMessages(10))
.build();
5. 使用工具
public static class StockTools {
@Tool("查询公司股价")
public String getStockPrice(@P("公司名称") String company) {
return "1000";
}
}
public static void main(String[] args) {
StockAssistant assistant = AiServices.builder(StockAssistant.class)
.chatModel(chatModel)
// 添加工具
.tools(new StockTools())
.build();
}
6. Guardrail(防护机制)
通过预设规则验证和过滤模型的输入与输出,确保交互过程的安全、可靠和合规。
- 输入 Guardrail:在用户输入发送给LLM之前执行,用于敏感词过滤、提示注入攻击防护等。
- 输出 Guardrail:在LLM生成响应之后、返回给用户之前执行,用于内容安全审核、幻觉检测等。
7. 多模态
ChatModel chatModel = OpenAiChatModel.builder()
.baseUrl(BASE_URL)
.apiKey(API_KEY)
.modelName("hunyuan-ocr") // 设置模型名称,需支持多模态
.build();
byte[] imageBytes = Files.readAllBytes(Paths.get(IMAGE_PATH));
String base64ImageData = Base64.getEncoder().encodeToString(imageBytes);
// 创建包含文本和图片内容的多模态用户消息
UserMessage userMessage = UserMessage.from(
TextContent.from("描述图片的内容"),
ImageContent.from(base64ImageData, "image/png")
);
ChatResponse chat = chatModel.chat(userMessage);
4.3 使用LangGraph4J构建工作流
1. 创建图(Node、Edge、State)
public static void main(String[] args) throws GraphStateException {
StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);
// 添加节点
graph.addNode("input_node", AsyncNodeAction.node_async(state -> {
System.out.println("[input_node] 接收到状态: " + state.data());
return Map.of("input_node", "input_node");
}));
graph.addNode("process_node", AsyncNodeAction.node_async(state -> {
System.out.println("[process_node] 接收到状态: " + state.data());
return Map.of("process_node", "process_node");
}));
// 添加边: START -> input_node -> process_node -> END
graph.addEdge(StateGraph.START, "input_node");
graph.addEdge("input_node", "process_node");
graph.addEdge("process_node", StateGraph.END);
// 编译并执行图
CompiledGraph<AgentState> compile = graph.compile();
Map<String, Object> initialData = new HashMap<>();
initialData.put("init_data", "init_data");
Optional<AgentState> invoke = compile.invoke(initialData);
invoke.ifPresent(state -> System.out.println("最终状态: " + state.data()));
}
2. 状态合并策略(Channels,类似于Python的Reducer)
public static void main(String[] args) throws GraphStateException {
// 定义Channels,指定每个状态字段的合并策略
Map<String, Channel<?>> channels = new LinkedHashMap<>();
channels.put("messages", Channels.appender(ArrayList::new)); // 集合追加
channels.put("counter", Channels.base(Integer::sum, () -> 0)); // 数字求和
channels.put("max_score", Channels.base(Math::max, () -> 0)); // 返回最大值
// 创建图,并指定状态字段合并策略
StateGraph<AgentState> graph = new StateGraph<>(channels, AgentState::new);
// ... 添加节点和边,编译并执行
}
3. 条件边
public static void main(String[] args) throws GraphStateException {
StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);
// ... 添加节点、其他边
// 定义条件映射关系
Map<String, String> mappings = new HashMap<>();
mappings.put("pass", "pass_handler");
mappings.put("fail", "fail_handler");
graph.addConditionalEdges("node", agentState -> {
// 自定义路由条件:根据State中的score字段判断
int score = (Integer) agentState.value("score").orElse(0);
return CompletableFuture.completedFuture(score >= 90 ? "pass" : "fail");
}, mappings);
}
4. 检查点(Checkpoint)
public static void main(String[] args) throws GraphStateException {
// 定义检查点保存器
MemorySaver checkpoint = new MemorySaver();
CompileConfig config = CompileConfig.builder()
.checkpointSaver(checkpoint)
.build();
StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);
// ... 添加节点、边
// 编译图时指定检查点保存器
CompiledGraph<AgentState> compile = graph.compile(config);
compile.invoke(new HashMap<>());
}
5. 人机协作(Human-in-the-Loop)
LangGraph4j提供了标准AgentExecutor类(即ReAct Agent)来支持人工审批工作流程,其实现思想与Python版本类似,通过中断机制和状态恢复实现人机协同。
至此,关于LangGraph的介绍就全部结束了。LangGraph作为多智能体应用的编排框架,通过图结构、灵活的状态管理和控制流,为构建复杂的多智能体应用提供了强大的基础设施。它作为LangChain生态的一部分,可以直接利用LangChain庞大的工具库和模型集成,快速搭建起强大的应用。无论是Python、JS还是Java版本,开发者都可以根据具体的应用场景和团队技术栈选择最合适的工具。
参考资料