找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3182

积分

0

好友

426

主题
发表于 2 小时前 | 查看: 2| 回复: 0

预计阅读:10分钟 | 代码均可 cargo run 运行

一、上回回顾

第5篇我们实现了 Executor,一行 executor.run(&canvas) 就能自动推导执行顺序、按连线传递数据。但真实场景里,LLM 会超时、会限流、会返回垃圾数据。

本篇聚焦:当一切出错时,如何优雅降级,而不是整条管线崩溃。

二、错误分类:哪些能重试,哪些不能

先给错误分个类,不同类型不同策略:

// ─── Agent 错误类型 ─────────────────────────────────
#[derive(Debug, Clone)]
pub enum ErrorKind {
    Timeout,              // LLM 调用超时
    RateLimit,            // 429 限流
    InvalidResponse,      // 返回内容无法解析
    NetworkDown,          // 网络中断
    AgentPanic,           // Agent 内部 panic
    ConfigMissing,        // 配置缺失
    Custom(String),       // 自定义错误
}

#[derive(Debug)]
pub struct AgentError {
    pub agent_id: NodeId,
    pub kind: ErrorKind,
    pub message: String,
    pub retryable: bool,  // 能不能重试
    pub attempts: u32,    // 已重试次数
}

impl AgentError {
    pub fn new(id: &NodeId, kind: ErrorKind, msg: &str) -> Self {
        let retryable = matches!(kind,
            ErrorKind::Timeout
            | ErrorKind::RateLimit
            | ErrorKind::NetworkDown
        );
        Self { agent_id: id.clone(), kind, message: msg.into(), retryable, attempts: 0 }
    }
}

核心字段 retryable 决定了能不能重试:

ErrorKind 可重试 典型场景
Timeout LLM 响应超过 30s
RateLimit 429 Too Many Requests
NetworkDown DNS 解析失败
InvalidResponse JSON 解析失败
AgentPanic 内部 unwrap 崩溃
ConfigMissing API Key 未配置

三、重试策略:指数退避

可重试的错误,用指数退避重试:

// ─── 重试策略 ───────────────────────────────────────
#[derive(Debug, Clone)]
pub struct RetryPolicy {
    pub max_retries: u32,         // 最大重试次数
    pub base_delay: Duration,     // 初始等待
    pub max_delay: Duration,      // 最大等待
    pub multiplier: f64,          // 退避倍数
}

impl RetryPolicy {
    pub fn default() -> Self {
        Self {
            max_retries: 3,
            base_delay: Duration::from_millis(100),
            max_delay: Duration::from_secs(10),
            multiplier: 2.0,
        }
    }

    /// 指数退避:100ms → 200ms → 400ms → ...
    pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
        let delay_ms = self.base_delay.as_millis() as f64
            * self.multiplier.powi(attempt as i32);
        let capped = delay_ms.min(self.max_delay.as_millis() as f64);
        Duration::from_millis(capped as u64)
    }
}

退避曲线:100ms → 200ms → 400ms → 800ms → ...,封顶 max_delay

四、带重试的执行逻辑

把 RetryPolicy 和 Executor 结合:

// ─── 带重试的执行逻辑 ──────────────────────────────
impl Executor {
    pub fn execute_with_retry(
        &self,
        agent: &dyn Agent,
        ctx: &Context,
        policy: &RetryPolicy,
    ) -> Result<Value, AgentError> {
        let mut last_err = None;

        for attempt in 0..=policy.max_retries {
            match agent.execute(ctx) {
                Ok(val) => return Ok(val),
                Err(e) => {
                    let mut err = AgentError::new(
                        agent.id(), e.kind.clone(), &e.message
                    );
                    err.attempts = attempt + 1;

                    if !err.retryable {
                        // 不可重试的错误,直接返回
                        return Err(err);
                    }

                    let delay = policy.delay_for_attempt(attempt);
                    std::thread::sleep(delay);
                    last_err = Some(err);
                }
            }
        }
        Err(last_err.unwrap())
    }
}

关键点:

  • 不可重试的错误(InvalidResponse),立即返回,不浪费重试
  • 可重试的错误(Timeout/RateLimit),退避后重试
  • 重试次数耗尽,返回最后一次错误

五、熔断器:别往死路上冲

如果某个 Agent 连续失败 3 次,继续请求只会浪费资源。熔断器(Circuit Breaker)就是干这个的:

// ─── 熔断器 ─────────────────────────────────────────
#[derive(Debug, Clone, PartialEq)]
pub enum CircuitState {
    Closed,      // 正常:请求放行
    Open,        // 熔断:请求直接拒绝
    HalfOpen,    // 半开:放行一个请求探测
}

pub struct CircuitBreaker {
    state: CircuitState,
    failure_count: u32,
    threshold: u32,            // 连续失败几次触发熔断
    reset_timeout: Duration,   // 熔断多久后半开
    last_failure: Option<std::time::Instant>,
}

impl CircuitBreaker {
    pub fn new(threshold: u32, reset_timeout: Duration) -> Self {
        Self {
            state: CircuitState::Closed,
            failure_count: 0,
            threshold,
            reset_timeout,
            last_failure: None,
        }
    }

    pub fn allow(&mut self) -> bool {
        match self.state {
            CircuitState::Closed => true,
            CircuitState::Open => {
                // 看是否过了冷却期
                if let Some(t) = self.last_failure {
                    if t.elapsed() > self.reset_timeout {
                        self.state = CircuitState::HalfOpen;
                        return true;  // 放一个请求试试
                    }
                }
                false  // 熔断中,拒绝
            }
            CircuitState::HalfOpen => true,
        }
    }

    pub fn record_success(&mut self) {
        self.failure_count = 0;
        self.state = CircuitState::Closed;
    }

    pub fn record_failure(&mut self) {
        self.failure_count += 1;
        self.last_failure = Some(std::time::Instant::now());
        if self.failure_count >= self.threshold {
            self.state = CircuitState::Open;
        }
    }
}

三种状态:

  • Closed(正常)— 请求放行
  • Open(熔断)— 请求直接拒绝,等冷却期
  • HalfOpen(半开)— 放一个请求探测,成功则恢复

六、降级链:Plan B、Plan C

熔断只是阻止坏请求,用户还得拿到结果。FallbackChain 提供多级备选方案:

// ─── 降级链 ─────────────────────────────────────────
pub struct FallbackChain {
    agents: Vec<Box<dyn Agent>>,
    breaker: CircuitBreaker,
}

impl FallbackChain {
    pub fn new(agents: Vec<Box<dyn Agent>>) -> Self {
        Self {
            agents,
            breaker: CircuitBreaker::new(3, Duration::from_secs(30)),
        }
    }

    /// 按优先级尝试,第一个成功就返回
    pub fn execute(&mut self, ctx: &Context) -> Result<Value, AgentError> {
        let mut last_err = None;
        for agent in &self.agents {
            if !self.breaker.allow() {
                continue;  // 熔断中,跳到下一个
            }
            match agent.execute(ctx) {
                Ok(val) => {
                    self.breaker.record_success();
                    return Ok(val);
                }
                Err(e) => {
                    self.breaker.record_failure();
                    last_err = Some(AgentError::new(
                        agent.id(), e.kind.clone(), &e.message
                    ));
                }
            }
        }
        Err(last_err.unwrap_or_else(|| AgentError::new(
            &NodeId("none".into()),
            ErrorKind::ConfigMissing,
            "所有Agent均不可用"
        )))
    }
}

典型配置:GPT-4(主力)→ DeepSeek(备选)→ 本地字典(兜底)

七、实战:翻译管线 + 错误处理

把前面几篇的翻译管线加上完整的容错机制:

// ─── 实战:翻译管线 + 错误处理 ─────────────────────
fn main() {
    let mut canvas = Canvas::new("robust-translate");

    // 三个翻译 Agent:GPT-4 优先,DeepSeek 备选,本地兜底
    let gpt4 = Gpt4Translator::new();
    let deepseek = DeepSeekTranslator::new();
    let local = LocalDictTranslator::new();

    let mut fallback = FallbackChain::new(vec![
        Box::new(gpt4),
        Box::new(deepseek),
        Box::new(local),
    ]);

    let ctx = Context::from_json(json!({
        "text": "Hello World",
        "target_lang": "zh-CN"
    }));

    // 带降级的执行
    match fallback.execute(&ctx) {
        Ok(val) => println!("翻译结果: {:?}", val),
        Err(e) => {
            eprintln!("所有Agent均失败: {:?}", e.kind);
            // 发送告警通知
            send_alert(&e);
        }
    }
}

八、运行结果

$ cargo run

[Executor] Gpt4Translator 执行中...
[Executor] Gpt4Translator 失败: Timeout (第1次)
[Executor] 等待 100ms 后重试...
[Executor] Gpt4Translator 失败: RateLimit (第2次)
[Executor] 等待 200ms 后重试...
[Executor] Gpt4Translator 失败: Timeout (第3次)
[Circuit] Gpt4Translator 连续失败 3 次,熔断!
[Fallback] 切换到 DeepSeekTranslator...
[Executor] DeepSeekTranslator 执行中...
[Executor] DeepSeekTranslator 成功!
翻译结果: {"text": "你好世界", "from": "DeepSeekTranslator"}

九、执行结果增强

StepResult 增加重试和降级信息:

// ─── 执行结果增强 ──────────────────────────────────
#[derive(Debug, Clone)]
pub struct StepResult {
    pub node_id: NodeId,
    pub status: StepStatus,
    pub output: Option<Value>,
    pub error: Option<AgentError>,
    pub duration: Duration,
    pub retries: u32,           // 重试了几次
    pub fallback_from: Option<NodeId>,  // 从哪个Agent降级来的
}

#[derive(Debug, Clone, PartialEq)]
pub enum StepStatus {
    Success,
    Failed,
    Skipped,
    Retried,       // 重试后成功
    Fallback,      // 降级后成功
}

5种状态,精确描述每一步的结局。

十、策略速查表

场景 策略 参数
LLM 偶发超时 指数退避 3次, 100ms起步
API 限流 429 退避 + jitter 5次, 1s起步
某个模型挂了 熔断 + 降级 3次失败熔断, 30s冷却
全部挂了 本地兜底 字典翻译/缓存结果

十一、总结

  1. 错误分类:retryable 决定能不能重试
  2. 指数退避:100ms → 200ms → 400ms,封顶 10s
  3. 熔断器:连续 3 次失败就熔断,30s 后半开探测
  4. 降级链:主力 → 备选 → 兜底,总有一个能用
  5. StepResult:5种状态,重试/降级信息可追溯

六篇下来,核心骨架已经完备:

  • 01-03:画布 + 节点 + 连线(数据模型)
  • 04:Agent Trait(执行单元)
  • 05:Executor(调度引擎)
  • 06:错误处理与重试(容错机制)✅ 本篇

下一篇我们聊 07 | 异步执行:tokio + 并行节点:当多个节点没有依赖关系时,如何并行跑起来。





上一篇:7个JavaScript AI库:资深工程师快速上手指南
下一篇:GPT-Image-2深度评测:中文渲染、推理架构与Arena榜首解析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-27 03:35 , Processed in 0.623987 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表