在物流调度、城市配送乃至油田运输等真实业务场景中,一个能提供确定性、可审计结果的路线规划系统至关重要。传统的纯文本大语言模型(LLM)往往会产生“幻觉距离”或随机ETA,这在实际生产中是不可接受的。
因此,本文将手把手教你构建一个生产级的路线优化智能体(Route Optimizer Agent)。其核心设计哲学是 “模型负责策略选择,工具负责精确计算” ,确保每一次距离和耗时计算都通过可验证的工具完成,输出完全结构化的结果。
整个系统基于 LangChain 的最新 Agent 工具驱动推理(Tool-driven Reasoning)框架,目标明确:
- 🌟 保证计算确定性:距离与ETA由工具计算,而非模型猜测。
- 🧩 强结构化输出:使用Pydantic Schema,保证输出机器可读。
- 🛠 用工具取代推测:所有计算必须调用工具,LLM无权“编造”。
- 🚦 支持多站点、多约束:处理途经点、道路等级等多种优化目标。
- 📑 可审计、可追踪:便于与现有调度系统集成。
下面,我们从零开始,实现这个从“能跑”到“能投产”的智能体。
直接让 LLM 回答“从A到B要多久?”,它可能会基于训练数据“想象”出一个距离和速度,从而产生:
- ❌ “幻觉距离”:模型虚构的地理数据。
- ❌ 随机速度与ETA:每次回答可能不一致。
- ❌ 输出不可解析:返回自由文本,下游系统难以处理。
- ❌ 不稳定,不可审计:无法追溯计算依据。
而生产调度要求的是:✔ 确定性 ✔ 可复现 ✔ 结构化 ✔ 审计性。
我们的解决方案是构建一种 “工具驱动的确定性计算(Tool-Driven Deterministic Computation)” 工作流。LLM在这里扮演决策者,它根据任务描述决定何时、如何使用我们提供的专业工具(如计算距离、寻找最优路径),而所有具体的数学计算和业务逻辑都封装在工具内部。LLM不知道具体算法,它必须调用工具才能得到结果。
环境初始化:安装依赖与准备工具链
首先,安装必要的 Python 库并设置环境。
!pip -q install -U langchain langchain-openai pydantic
import os
from getpass import getpass
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass("Enter OPENAI_API_KEY (input hidden): ")
from typing import Dict, List, Optional, Tuple, Any
from math import radians, sin, cos, sqrt, atan2
from pydantic import BaseModel, Field, ValidationError
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain.agents import create_agent
这段代码完成了依赖安装、OpenAI API密钥设置,并导入了后续所需的核心模块。
定义业务数据:站点地图与速度配置
任何路线系统都始于一张地图。这里我们定义一组模拟站点及其经纬度,以及不同道路等级的行驶速度。
SITES: Dict[str, Dict[str, Any]] = {
"Rig_A": {"lat": 23.5880, "lon": 58.3829, "type": "rig"},
"Rig_B": {"lat": 23.6100, "lon": 58.5400, "type": "rig"},
"Rig_C": {"lat": 23.4500, "lon": 58.3000, "type": "rig"},
"Yard_Main": {"lat": 23.5700, "lon": 58.4100, "type": "yard"},
"Depot_1": {"lat": 23.5200, "lon": 58.4700, "type": "depot"},
"Depot_2": {"lat": 23.6400, "lon": 58.4300, "type": "depot"},
}
SPEED_PROFILES: Dict[str, float] = {
"highway": 90.0,
"arterial": 65.0,
"local": 45.0,
}
DEFAULT_TRAFFIC_MULTIPLIER = 1.10
特点:
- 明确的地点经纬度:真实坐标数据。
- 明确的道路限速:区分高速、主干道、本地道路。
- 明确的交通拥堵因子:默认增加10%的行程时间。
基于此,路线ETA将严格遵循公式:ETA = 距离 / 速度 × 交通因子,计算结果完全确定。
核心计算模块:Haversine距离与路段ETA
计算两点间距离我们采用地理信息系统(GIS)中常用的Haversine公式,它基于球面模型计算大圆距离,精度足以满足大多数物流场景。
def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
R = 6371.0
dlat = radians(lat2 - lat1)
dlon = radians(lon2 - lon1)
a = sin(dlat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2) ** 2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return R * c
基于距离,我们可以计算单段路的预计行驶时间(ETA)。
def _normalize_site_name(name: str) -> str:
return name.strip()
def _assert_site_exists(name: str) -> None:
if name not in SITES:
raise ValueError(f"Unknown site '{name}'. Use list_sites() or suggest_site().")
def _distance_between(a: str, b: str) -> float:
_assert_site_exists(a)
_assert_site_exists(b)
sa, sb = SITES[a], SITES[b]
return float(haversine_km(sa["lat"], sa["lon"], sb["lat"], sb["lon"]))
def _eta_minutes(distance_km: float, speed_kmph: float, traffic_multiplier: float) -> float:
speed = max(float(speed_kmph), 1e-6)
base_minutes = (distance_km / speed) * 60.0
return float(base_minutes * max(float(traffic_multiplier), 0.0))
def compute_route_metrics(path: List[str], speed_kmph: float, traffic_multiplier: float) -> Dict[str, Any]:
if len(path) < 2:
raise ValueError("Route path must include at least origin and destination.")
for s in path:
_assert_site_exists(s)
legs = []
total_km = 0.0
total_min = 0.0
for i in range(len(path) - 1):
a, b = path[i], path[i + 1]
d_km = _distance_between(a, b)
t_min = _eta_minutes(d_km, speed_kmph, traffic_multiplier)
legs.append({"from": a, "to": b, "distance_km": d_km, "eta_minutes": t_min})
total_km += d_km
total_min += t_min
return {"route": path, "distance_km": float(total_km), "eta_minutes": float(total_min), "legs": legs}
compute_route_metrics 函数是核心,它接收一个路径列表(如 ["Yard_Main", "Depot_1", "Rig_B"]),并计算出整条路线的总距离、总耗时以及每一段的明细。
路线生成:枚举所有可能路径并寻优
当存在多个可选途经点时,系统需要枚举所有合理的路径排列,并从中找出最优解。我们支持两种优化目标:最短时间(ETA) 或 最短距离。
def _all_paths_with_waypoints(origin: str, destination: str, waypoints: List[str], max_stops: int) -> List[List[str]]:
from itertools import permutations
waypoints = [w for w in waypoints if w not in (origin, destination)]
max_stops = int(max(0, max_stops))
candidates = []
for k in range(0, min(len(waypoints), max_stops) + 1):
for perm in permutations(waypoints, k):
candidates.append([origin, *perm, destination])
if [origin, destination] not in candidates:
candidates.insert(0, [origin, destination])
return candidates
def find_best_route(origin: str, destination: str, allowed_waypoints: Optional[List[str]], max_stops: int, speed_kmph: float, traffic_multiplier: float, objective: str, top_k: int) -> Dict[str, Any]:
origin = _normalize_site_name(origin)
destination = _normalize_site_name(destination)
_assert_site_exists(origin)
_assert_site_exists(destination)
allowed_waypoints = allowed_waypoints or []
for w in allowed_waypoints:
_assert_site_exists(_normalize_site_name(w))
objective = (objective or "eta").strip().lower()
if objective not in {"eta", "distance"}:
raise ValueError("objective must be one of: 'eta', 'distance'")
top_k = max(1, int(top_k))
candidates = _all_paths_with_waypoints(origin, destination, allowed_waypoints, max_stops=max_stops)
scored = []
for path in candidates:
metrics = compute_route_metrics(path, speed_kmph=speed_kmph, traffic_multiplier=traffic_multiplier)
score = metrics["eta_minutes"] if objective == "eta" else metrics["distance_km"]
scored.append((score, metrics))
scored.sort(key=lambda x: x[0])
best = scored[0][1]
alternatives = [m for _, m in scored[1:top_k]]
return {"best": best, "alternatives": alternatives, "objective": objective}
函数 find_best_route 是路线优化的“大脑”,它根据约束条件生成候选路径,计算每条路径的指标,并按优化目标(objective="eta"或"distance")排序,返回最优解和Top K候选方案。
构建工具层:赋予Agent“手脚”
现在,我们将上述计算能力封装成 LangChain Tool。这是智能体“可信”的根源,LLM必须通过调用这些工具来获取信息或执行计算。
@tool
def list_sites() -> str:
"""List all available site names."""
return "\n".join(sorted(SITES.keys()))
@tool
def get_site_details(site_name: str) -> str:
"""Get detailed information (lat, lon, type) for a specific site."""
_assert_site_exists(site_name)
info = SITES[site_name]
return f"{site_name}: lat={info['lat']}, lon={info['lon']}, type={info['type']}"
@tool
def suggest_site(keyword: str) -> str:
"""Suggest site names based on a keyword (fuzzy matching)."""
kw = keyword.lower().strip()
matches = [name for name in SITES if kw in name.lower()]
return "\n".join(matches) if matches else f"No sites found matching '{keyword}'."
@tool
def compute_direct_route(origin: str, destination: str, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER) -> Dict[str, Any]:
"""Compute direct route metrics between two sites."""
speed = SPEED_PROFILES.get(road_class, SPEED_PROFILES["arterial"])
return compute_route_metrics([origin, destination], speed_kmph=speed, traffic_multiplier=traffic_multiplier)
@tool
def optimize_route(origin: str, destination: str, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER, allowed_waypoints: Optional[List[str]] = None, max_stops: int = 2, objective: str = "eta", top_k: int = 3) -> Dict[str, Any]:
"""Find the optimal route, optionally via waypoints, returning the best and top alternatives."""
speed = SPEED_PROFILES.get(road_class, SPEED_PROFILES["arterial"])
return find_best_route(origin, destination, allowed_waypoints, max_stops, speed, traffic_multiplier, objective, top_k)
关键工具 optimize_route 封装了之前的寻优逻辑。LLM在思考时,一旦需要比较路径或计算综合最优,就会调用此工具。
定义结构化输出:用Pydantic保证接口规范
为了让输出能被下游系统直接消费,我们使用Pydantic定义严格的返回模式。
class RouteLeg(BaseModel):
from_site: str
to_site: str
distance_km: float
eta_minutes: float
class RoutePlan(BaseModel):
route: List[str]
distance_km: float
eta_minutes: float
legs: List[RouteLeg]
objective: str
class RouteDecision(BaseModel):
chosen: RoutePlan
alternatives: List[RoutePlan] = []
assumptions: Dict[str, Any] = {}
notes: str = ""
audit: List[str] = []
RouteDecision 是智能体的最终输出,包含:
chosen:选出的最佳路线方案。
alternatives:其他候选方案列表。
assumptions & notes:记录计算所基于的假设或额外说明。
audit:审计日志,记录决策过程中的关键步骤(例如调用了哪些工具)。
这确保了输出是标准的JSON对象,无需再从文本中解析信息。
组装Route Optimizer Agent
最后,我们将LLM、工具和输出格式绑定在一起,创建智能体。
SYSTEM_PROMPT = (
"You are the Route Optimizer Agent for a logistics dispatch center.\n"
"You MUST use tools for any distance/ETA calculation.\n"
"Return ONLY the structured RouteDecision."
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)
route_agent = create_agent(
model=llm,
tools=[list_sites, get_site_details, suggest_site, compute_direct_route, optimize_route],
system_prompt=SYSTEM_PROMPT,
response_format=RouteDecision,
)
def get_route_decision(origin: str, destination: str, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER, allowed_waypoints: Optional[List[str]] = None, max_stops: int = 2, objective: str = "eta", top_k: int = 3) -> RouteDecision:
user_msg = {
"role": "user",
"content": (
f"Optimize the route from {origin} to {destination}.\n"
f"road_class={road_class}, traffic_multiplier={traffic_multiplier}\n"
f"objective={objective}, top_k={top_k}\n"
f"allowed_waypoints={allowed_waypoints}, max_stops={max_stops}\n"
"Return the structured RouteDecision only."
),
}
result = route_agent.invoke({"messages": [user_msg]})
return result["structured_response"]
系统提示词(SYSTEM_PROMPT)强制Agent必须使用工具进行计算。temperature=0.2设置为低随机性,保证输出的稳定性。create_agent 函数将一切连接起来,并指定了 RouteDecision 作为响应格式。
实际调用示例
让我们看看这个智能体如何工作。
示例1:简单直达路线
decision1 = get_route_decision("Yard_Main", "Rig_B", road_class="arterial", traffic_multiplier=1.12)
print(decision1.model_dump())
Agent会理解任务,调用 optimize_route 工具(或先调用compute_direct_route),并返回结构化的 RouteDecision。
示例2:复杂多途经点优化
decision2 = get_route_decision("Rig_C", "Rig_B", road_class="highway", traffic_multiplier=1.08, allowed_waypoints=["Depot_1", "Depot_2", "Yard_Main"], max_stops=2, objective="eta", top_k=3)
print(decision2.model_dump())
在这个案例中,Agent需要处理最多2个途经点,在Depot_1、Depot_2、Yard_Main中排列组合,找出时间(ETA)最短的前3条路线。它会自动调用 optimize_route 工具完成所有繁重的计算和排序工作。
总结与展望
通过以上步骤,我们成功构建了一个具备生产级潜力的路线优化智能体:
- 💯 可生产部署:代码模块化,依赖清晰。
- 🔍 可审计:所有计算通过工具完成,有迹可循。
- 📊 结构化输出:Pydantic Schema保证接口一致性。
- 🔐 确定性运算:结果不依赖LLM的“想象”。
- 🧠 智能选择策略:LLM负责理解复杂约束并调用正确工具。
这个框架具有高度的可扩展性。你可以轻松地:
- 集成实时交通API(如高德、Google Maps),替换静态的
traffic_multiplier。
- 引入成本模型,将油价、司机工时费作为优化目标。
- 添加车辆约束(载重、体积、车长),使其升级为车辆路径问题(VRP)求解器。
- 连接数据库,动态加载站点和路网信息。
希望这篇实践指南能帮助你掌握构建确定性AI智能体的核心方法。在云栈社区,你可以找到更多关于Agent、大模型应用开发以及系统架构的深度讨论与资源。欢迎一起交流,将想法变为可落地的系统。