找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1677

积分

0

好友

217

主题
发表于 昨天 02:26 | 查看: 4| 回复: 0

在物流调度、城市配送乃至油田运输等真实业务场景中,一个能提供确定性可审计结果的路线规划系统至关重要。传统的纯文本大语言模型(LLM)往往会产生“幻觉距离”或随机ETA,这在实际生产中是不可接受的。

因此,本文将手把手教你构建一个生产级的路线优化智能体(Route Optimizer Agent)。其核心设计哲学是 “模型负责策略选择,工具负责精确计算” ,确保每一次距离和耗时计算都通过可验证的工具完成,输出完全结构化的结果。

整个系统基于 LangChain 的最新 Agent 工具驱动推理(Tool-driven Reasoning)框架,目标明确:

  • 🌟 保证计算确定性:距离与ETA由工具计算,而非模型猜测。
  • 🧩 强结构化输出:使用Pydantic Schema,保证输出机器可读。
  • 🛠 用工具取代推测:所有计算必须调用工具,LLM无权“编造”。
  • 🚦 支持多站点、多约束:处理途经点、道路等级等多种优化目标。
  • 📑 可审计、可追踪:便于与现有调度系统集成。

下面,我们从零开始,实现这个从“能跑”到“能投产”的智能体。

为什么路线优化必须用 Agent + Tools?

直接让 LLM 回答“从A到B要多久?”,它可能会基于训练数据“想象”出一个距离和速度,从而产生:

  • “幻觉距离”:模型虚构的地理数据。
  • 随机速度与ETA:每次回答可能不一致。
  • 输出不可解析:返回自由文本,下游系统难以处理。
  • 不稳定,不可审计:无法追溯计算依据。

而生产调度要求的是:✔ 确定性 ✔ 可复现 ✔ 结构化 ✔ 审计性

我们的解决方案是构建一种 “工具驱动的确定性计算(Tool-Driven Deterministic Computation)” 工作流。LLM在这里扮演决策者,它根据任务描述决定何时、如何使用我们提供的专业工具(如计算距离、寻找最优路径),而所有具体的数学计算和业务逻辑都封装在工具内部。LLM不知道具体算法,它必须调用工具才能得到结果。

环境初始化:安装依赖与准备工具链

首先,安装必要的 Python 库并设置环境。

!pip -q install -U langchain langchain-openai pydantic
import os
from getpass import getpass

if not os.environ.get("OPENAI_API_KEY"):
    os.environ["OPENAI_API_KEY"] = getpass("Enter OPENAI_API_KEY (input hidden): ")

from typing import Dict, List, Optional, Tuple, Any
from math import radians, sin, cos, sqrt, atan2

from pydantic import BaseModel, Field, ValidationError
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain.agents import create_agent

这段代码完成了依赖安装、OpenAI API密钥设置,并导入了后续所需的核心模块。

定义业务数据:站点地图与速度配置

任何路线系统都始于一张地图。这里我们定义一组模拟站点及其经纬度,以及不同道路等级的行驶速度。

SITES: Dict[str, Dict[str, Any]] = {
    "Rig_A": {"lat": 23.5880, "lon": 58.3829, "type": "rig"},
    "Rig_B": {"lat": 23.6100, "lon": 58.5400, "type": "rig"},
    "Rig_C": {"lat": 23.4500, "lon": 58.3000, "type": "rig"},
    "Yard_Main": {"lat": 23.5700, "lon": 58.4100, "type": "yard"},
    "Depot_1": {"lat": 23.5200, "lon": 58.4700, "type": "depot"},
    "Depot_2": {"lat": 23.6400, "lon": 58.4300, "type": "depot"},
}

SPEED_PROFILES: Dict[str, float] = {
    "highway": 90.0,
    "arterial": 65.0,
    "local": 45.0,
}

DEFAULT_TRAFFIC_MULTIPLIER = 1.10

特点

  • 明确的地点经纬度:真实坐标数据。
  • 明确的道路限速:区分高速、主干道、本地道路。
  • 明确的交通拥堵因子:默认增加10%的行程时间。

基于此,路线ETA将严格遵循公式:ETA = 距离 / 速度 × 交通因子,计算结果完全确定。

核心计算模块:Haversine距离与路段ETA

计算两点间距离我们采用地理信息系统(GIS)中常用的Haversine公式,它基于球面模型计算大圆距离,精度足以满足大多数物流场景。

def haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    R = 6371.0
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat / 2) ** 2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c

基于距离,我们可以计算单段路的预计行驶时间(ETA)。

def _normalize_site_name(name: str) -> str:
    return name.strip()

def _assert_site_exists(name: str) -> None:
    if name not in SITES:
        raise ValueError(f"Unknown site '{name}'. Use list_sites() or suggest_site().")

def _distance_between(a: str, b: str) -> float:
    _assert_site_exists(a)
    _assert_site_exists(b)
    sa, sb = SITES[a], SITES[b]
    return float(haversine_km(sa["lat"], sa["lon"], sb["lat"], sb["lon"]))

def _eta_minutes(distance_km: float, speed_kmph: float, traffic_multiplier: float) -> float:
    speed = max(float(speed_kmph), 1e-6)
    base_minutes = (distance_km / speed) * 60.0
    return float(base_minutes * max(float(traffic_multiplier), 0.0))

def compute_route_metrics(path: List[str], speed_kmph: float, traffic_multiplier: float) -> Dict[str, Any]:
    if len(path) < 2:
        raise ValueError("Route path must include at least origin and destination.")
    for s in path:
        _assert_site_exists(s)
    legs = []
    total_km = 0.0
    total_min = 0.0
    for i in range(len(path) - 1):
        a, b = path[i], path[i + 1]
        d_km = _distance_between(a, b)
        t_min = _eta_minutes(d_km, speed_kmph, traffic_multiplier)
        legs.append({"from": a, "to": b, "distance_km": d_km, "eta_minutes": t_min})
        total_km += d_km
        total_min += t_min
    return {"route": path, "distance_km": float(total_km), "eta_minutes": float(total_min), "legs": legs}

compute_route_metrics 函数是核心,它接收一个路径列表(如 ["Yard_Main", "Depot_1", "Rig_B"]),并计算出整条路线的总距离、总耗时以及每一段的明细。

路线生成:枚举所有可能路径并寻优

当存在多个可选途经点时,系统需要枚举所有合理的路径排列,并从中找出最优解。我们支持两种优化目标:最短时间(ETA)最短距离

def _all_paths_with_waypoints(origin: str, destination: str, waypoints: List[str], max_stops: int) -> List[List[str]]:
    from itertools import permutations
    waypoints = [w for w in waypoints if w not in (origin, destination)]
    max_stops = int(max(0, max_stops))
    candidates = []
    for k in range(0, min(len(waypoints), max_stops) + 1):
        for perm in permutations(waypoints, k):
            candidates.append([origin, *perm, destination])
    if [origin, destination] not in candidates:
        candidates.insert(0, [origin, destination])
    return candidates

def find_best_route(origin: str, destination: str, allowed_waypoints: Optional[List[str]], max_stops: int, speed_kmph: float, traffic_multiplier: float, objective: str, top_k: int) -> Dict[str, Any]:
    origin = _normalize_site_name(origin)
    destination = _normalize_site_name(destination)
    _assert_site_exists(origin)
    _assert_site_exists(destination)
    allowed_waypoints = allowed_waypoints or []
    for w in allowed_waypoints:
        _assert_site_exists(_normalize_site_name(w))
    objective = (objective or "eta").strip().lower()
    if objective not in {"eta", "distance"}:
        raise ValueError("objective must be one of: 'eta', 'distance'")
    top_k = max(1, int(top_k))
    candidates = _all_paths_with_waypoints(origin, destination, allowed_waypoints, max_stops=max_stops)
    scored = []
    for path in candidates:
        metrics = compute_route_metrics(path, speed_kmph=speed_kmph, traffic_multiplier=traffic_multiplier)
        score = metrics["eta_minutes"] if objective == "eta" else metrics["distance_km"]
        scored.append((score, metrics))
    scored.sort(key=lambda x: x[0])
    best = scored[0][1]
    alternatives = [m for _, m in scored[1:top_k]]
    return {"best": best, "alternatives": alternatives, "objective": objective}

函数 find_best_route 是路线优化的“大脑”,它根据约束条件生成候选路径,计算每条路径的指标,并按优化目标(objective="eta""distance")排序,返回最优解和Top K候选方案。

构建工具层:赋予Agent“手脚”

现在,我们将上述计算能力封装成 LangChain Tool。这是智能体“可信”的根源,LLM必须通过调用这些工具来获取信息或执行计算。

@tool
def list_sites() -> str:
    """List all available site names."""
    return "\n".join(sorted(SITES.keys()))

@tool
def get_site_details(site_name: str) -> str:
    """Get detailed information (lat, lon, type) for a specific site."""
    _assert_site_exists(site_name)
    info = SITES[site_name]
    return f"{site_name}: lat={info['lat']}, lon={info['lon']}, type={info['type']}"

@tool
def suggest_site(keyword: str) -> str:
    """Suggest site names based on a keyword (fuzzy matching)."""
    kw = keyword.lower().strip()
    matches = [name for name in SITES if kw in name.lower()]
    return "\n".join(matches) if matches else f"No sites found matching '{keyword}'."

@tool
def compute_direct_route(origin: str, destination: str, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER) -> Dict[str, Any]:
    """Compute direct route metrics between two sites."""
    speed = SPEED_PROFILES.get(road_class, SPEED_PROFILES["arterial"])
    return compute_route_metrics([origin, destination], speed_kmph=speed, traffic_multiplier=traffic_multiplier)

@tool
def optimize_route(origin: str, destination: str, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER, allowed_waypoints: Optional[List[str]] = None, max_stops: int = 2, objective: str = "eta", top_k: int = 3) -> Dict[str, Any]:
    """Find the optimal route, optionally via waypoints, returning the best and top alternatives."""
    speed = SPEED_PROFILES.get(road_class, SPEED_PROFILES["arterial"])
    return find_best_route(origin, destination, allowed_waypoints, max_stops, speed, traffic_multiplier, objective, top_k)

关键工具 optimize_route 封装了之前的寻优逻辑。LLM在思考时,一旦需要比较路径或计算综合最优,就会调用此工具。

定义结构化输出:用Pydantic保证接口规范

为了让输出能被下游系统直接消费,我们使用Pydantic定义严格的返回模式。

class RouteLeg(BaseModel):
    from_site: str
    to_site: str
    distance_km: float
    eta_minutes: float

class RoutePlan(BaseModel):
    route: List[str]
    distance_km: float
    eta_minutes: float
    legs: List[RouteLeg]
    objective: str

class RouteDecision(BaseModel):
    chosen: RoutePlan
    alternatives: List[RoutePlan] = []
    assumptions: Dict[str, Any] = {}
    notes: str = ""
    audit: List[str] = []

RouteDecision 是智能体的最终输出,包含:

  • chosen:选出的最佳路线方案。
  • alternatives:其他候选方案列表。
  • assumptions & notes:记录计算所基于的假设或额外说明。
  • audit:审计日志,记录决策过程中的关键步骤(例如调用了哪些工具)。

这确保了输出是标准的JSON对象,无需再从文本中解析信息。

组装Route Optimizer Agent

最后,我们将LLM、工具和输出格式绑定在一起,创建智能体。

SYSTEM_PROMPT = (
    "You are the Route Optimizer Agent for a logistics dispatch center.\n"
    "You MUST use tools for any distance/ETA calculation.\n"
    "Return ONLY the structured RouteDecision."
)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)

route_agent = create_agent(
    model=llm,
    tools=[list_sites, get_site_details, suggest_site, compute_direct_route, optimize_route],
    system_prompt=SYSTEM_PROMPT,
    response_format=RouteDecision,
)

def get_route_decision(origin: str, destination: str, road_class: str = "arterial", traffic_multiplier: float = DEFAULT_TRAFFIC_MULTIPLIER, allowed_waypoints: Optional[List[str]] = None, max_stops: int = 2, objective: str = "eta", top_k: int = 3) -> RouteDecision:
    user_msg = {
        "role": "user",
        "content": (
            f"Optimize the route from {origin} to {destination}.\n"
            f"road_class={road_class}, traffic_multiplier={traffic_multiplier}\n"
            f"objective={objective}, top_k={top_k}\n"
            f"allowed_waypoints={allowed_waypoints}, max_stops={max_stops}\n"
            "Return the structured RouteDecision only."
        ),
    }
    result = route_agent.invoke({"messages": [user_msg]})
    return result["structured_response"]

系统提示词(SYSTEM_PROMPT)强制Agent必须使用工具进行计算。temperature=0.2设置为低随机性,保证输出的稳定性。create_agent 函数将一切连接起来,并指定了 RouteDecision 作为响应格式。

实际调用示例

让我们看看这个智能体如何工作。

示例1:简单直达路线

decision1 = get_route_decision("Yard_Main", "Rig_B", road_class="arterial", traffic_multiplier=1.12)
print(decision1.model_dump())

Agent会理解任务,调用 optimize_route 工具(或先调用compute_direct_route),并返回结构化的 RouteDecision

示例2:复杂多途经点优化

decision2 = get_route_decision("Rig_C", "Rig_B", road_class="highway", traffic_multiplier=1.08, allowed_waypoints=["Depot_1", "Depot_2", "Yard_Main"], max_stops=2, objective="eta", top_k=3)
print(decision2.model_dump())

在这个案例中,Agent需要处理最多2个途经点,在Depot_1Depot_2Yard_Main中排列组合,找出时间(ETA)最短的前3条路线。它会自动调用 optimize_route 工具完成所有繁重的计算和排序工作。

总结与展望

通过以上步骤,我们成功构建了一个具备生产级潜力的路线优化智能体:

  • 💯 可生产部署:代码模块化,依赖清晰。
  • 🔍 可审计:所有计算通过工具完成,有迹可循。
  • 📊 结构化输出:Pydantic Schema保证接口一致性。
  • 🔐 确定性运算:结果不依赖LLM的“想象”。
  • 🧠 智能选择策略:LLM负责理解复杂约束并调用正确工具。

这个框架具有高度的可扩展性。你可以轻松地:

  1. 集成实时交通API(如高德、Google Maps),替换静态的 traffic_multiplier
  2. 引入成本模型,将油价、司机工时费作为优化目标。
  3. 添加车辆约束(载重、体积、车长),使其升级为车辆路径问题(VRP)求解器。
  4. 连接数据库,动态加载站点和路网信息。

希望这篇实践指南能帮助你掌握构建确定性AI智能体的核心方法。在云栈社区,你可以找到更多关于Agent、大模型应用开发以及系统架构的深度讨论与资源。欢迎一起交流,将想法变为可落地的系统。




上一篇:RUL预测实战对比:LSTM与Transformer在工业设备剩余使用寿命预测中的表现差异
下一篇:AWS AI工具Kiro权限失控,自主删除生产环境致服务中断13小时
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-2-25 09:11 , Processed in 1.473219 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表