预计阅读:10分钟 | 代码均可 cargo run 运行
一、上回回顾
第5篇我们实现了 Executor,一行 executor.run(&canvas) 就能自动推导执行顺序、按连线传递数据。但真实场景里,LLM 会超时、会限流、会返回垃圾数据。
本篇聚焦:当一切出错时,如何优雅降级,而不是整条管线崩溃。
二、错误分类:哪些能重试,哪些不能
先给错误分个类,不同类型不同策略:
// ─── Agent 错误类型 ─────────────────────────────────
#[derive(Debug, Clone)]
pub enum ErrorKind {
Timeout, // LLM 调用超时
RateLimit, // 429 限流
InvalidResponse, // 返回内容无法解析
NetworkDown, // 网络中断
AgentPanic, // Agent 内部 panic
ConfigMissing, // 配置缺失
Custom(String), // 自定义错误
}
#[derive(Debug)]
pub struct AgentError {
pub agent_id: NodeId,
pub kind: ErrorKind,
pub message: String,
pub retryable: bool, // 能不能重试
pub attempts: u32, // 已重试次数
}
impl AgentError {
pub fn new(id: &NodeId, kind: ErrorKind, msg: &str) -> Self {
let retryable = matches!(kind,
ErrorKind::Timeout
| ErrorKind::RateLimit
| ErrorKind::NetworkDown
);
Self { agent_id: id.clone(), kind, message: msg.into(), retryable, attempts: 0 }
}
}
核心字段 retryable 决定了能不能重试:
| ErrorKind |
可重试 |
典型场景 |
| Timeout |
✅ |
LLM 响应超过 30s |
| RateLimit |
✅ |
429 Too Many Requests |
| NetworkDown |
✅ |
DNS 解析失败 |
| InvalidResponse |
❌ |
JSON 解析失败 |
| AgentPanic |
❌ |
内部 unwrap 崩溃 |
| ConfigMissing |
❌ |
API Key 未配置 |
三、重试策略:指数退避
可重试的错误,用指数退避重试:
// ─── 重试策略 ───────────────────────────────────────
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_retries: u32, // 最大重试次数
pub base_delay: Duration, // 初始等待
pub max_delay: Duration, // 最大等待
pub multiplier: f64, // 退避倍数
}
impl RetryPolicy {
pub fn default() -> Self {
Self {
max_retries: 3,
base_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(10),
multiplier: 2.0,
}
}
/// 指数退避:100ms → 200ms → 400ms → ...
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
let delay_ms = self.base_delay.as_millis() as f64
* self.multiplier.powi(attempt as i32);
let capped = delay_ms.min(self.max_delay.as_millis() as f64);
Duration::from_millis(capped as u64)
}
}
退避曲线:100ms → 200ms → 400ms → 800ms → ...,封顶 max_delay。
四、带重试的执行逻辑
把 RetryPolicy 和 Executor 结合:
// ─── 带重试的执行逻辑 ──────────────────────────────
impl Executor {
pub fn execute_with_retry(
&self,
agent: &dyn Agent,
ctx: &Context,
policy: &RetryPolicy,
) -> Result<Value, AgentError> {
let mut last_err = None;
for attempt in 0..=policy.max_retries {
match agent.execute(ctx) {
Ok(val) => return Ok(val),
Err(e) => {
let mut err = AgentError::new(
agent.id(), e.kind.clone(), &e.message
);
err.attempts = attempt + 1;
if !err.retryable {
// 不可重试的错误,直接返回
return Err(err);
}
let delay = policy.delay_for_attempt(attempt);
std::thread::sleep(delay);
last_err = Some(err);
}
}
}
Err(last_err.unwrap())
}
}
关键点:
- 不可重试的错误(InvalidResponse),立即返回,不浪费重试
- 可重试的错误(Timeout/RateLimit),退避后重试
- 重试次数耗尽,返回最后一次错误
五、熔断器:别往死路上冲
如果某个 Agent 连续失败 3 次,继续请求只会浪费资源。熔断器(Circuit Breaker)就是干这个的:
// ─── 熔断器 ─────────────────────────────────────────
#[derive(Debug, Clone, PartialEq)]
pub enum CircuitState {
Closed, // 正常:请求放行
Open, // 熔断:请求直接拒绝
HalfOpen, // 半开:放行一个请求探测
}
pub struct CircuitBreaker {
state: CircuitState,
failure_count: u32,
threshold: u32, // 连续失败几次触发熔断
reset_timeout: Duration, // 熔断多久后半开
last_failure: Option<std::time::Instant>,
}
impl CircuitBreaker {
pub fn new(threshold: u32, reset_timeout: Duration) -> Self {
Self {
state: CircuitState::Closed,
failure_count: 0,
threshold,
reset_timeout,
last_failure: None,
}
}
pub fn allow(&mut self) -> bool {
match self.state {
CircuitState::Closed => true,
CircuitState::Open => {
// 看是否过了冷却期
if let Some(t) = self.last_failure {
if t.elapsed() > self.reset_timeout {
self.state = CircuitState::HalfOpen;
return true; // 放一个请求试试
}
}
false // 熔断中,拒绝
}
CircuitState::HalfOpen => true,
}
}
pub fn record_success(&mut self) {
self.failure_count = 0;
self.state = CircuitState::Closed;
}
pub fn record_failure(&mut self) {
self.failure_count += 1;
self.last_failure = Some(std::time::Instant::now());
if self.failure_count >= self.threshold {
self.state = CircuitState::Open;
}
}
}
三种状态:
- Closed(正常)— 请求放行
- Open(熔断)— 请求直接拒绝,等冷却期
- HalfOpen(半开)— 放一个请求探测,成功则恢复
六、降级链:Plan B、Plan C
熔断只是阻止坏请求,用户还得拿到结果。FallbackChain 提供多级备选方案:
// ─── 降级链 ─────────────────────────────────────────
pub struct FallbackChain {
agents: Vec<Box<dyn Agent>>,
breaker: CircuitBreaker,
}
impl FallbackChain {
pub fn new(agents: Vec<Box<dyn Agent>>) -> Self {
Self {
agents,
breaker: CircuitBreaker::new(3, Duration::from_secs(30)),
}
}
/// 按优先级尝试,第一个成功就返回
pub fn execute(&mut self, ctx: &Context) -> Result<Value, AgentError> {
let mut last_err = None;
for agent in &self.agents {
if !self.breaker.allow() {
continue; // 熔断中,跳到下一个
}
match agent.execute(ctx) {
Ok(val) => {
self.breaker.record_success();
return Ok(val);
}
Err(e) => {
self.breaker.record_failure();
last_err = Some(AgentError::new(
agent.id(), e.kind.clone(), &e.message
));
}
}
}
Err(last_err.unwrap_or_else(|| AgentError::new(
&NodeId("none".into()),
ErrorKind::ConfigMissing,
"所有Agent均不可用"
)))
}
}
典型配置:GPT-4(主力)→ DeepSeek(备选)→ 本地字典(兜底)
七、实战:翻译管线 + 错误处理
把前面几篇的翻译管线加上完整的容错机制:
// ─── 实战:翻译管线 + 错误处理 ─────────────────────
fn main() {
let mut canvas = Canvas::new("robust-translate");
// 三个翻译 Agent:GPT-4 优先,DeepSeek 备选,本地兜底
let gpt4 = Gpt4Translator::new();
let deepseek = DeepSeekTranslator::new();
let local = LocalDictTranslator::new();
let mut fallback = FallbackChain::new(vec![
Box::new(gpt4),
Box::new(deepseek),
Box::new(local),
]);
let ctx = Context::from_json(json!({
"text": "Hello World",
"target_lang": "zh-CN"
}));
// 带降级的执行
match fallback.execute(&ctx) {
Ok(val) => println!("翻译结果: {:?}", val),
Err(e) => {
eprintln!("所有Agent均失败: {:?}", e.kind);
// 发送告警通知
send_alert(&e);
}
}
}
八、运行结果
$ cargo run
[Executor] Gpt4Translator 执行中...
[Executor] Gpt4Translator 失败: Timeout (第1次)
[Executor] 等待 100ms 后重试...
[Executor] Gpt4Translator 失败: RateLimit (第2次)
[Executor] 等待 200ms 后重试...
[Executor] Gpt4Translator 失败: Timeout (第3次)
[Circuit] Gpt4Translator 连续失败 3 次,熔断!
[Fallback] 切换到 DeepSeekTranslator...
[Executor] DeepSeekTranslator 执行中...
[Executor] DeepSeekTranslator 成功!
翻译结果: {"text": "你好世界", "from": "DeepSeekTranslator"}
九、执行结果增强
StepResult 增加重试和降级信息:
// ─── 执行结果增强 ──────────────────────────────────
#[derive(Debug, Clone)]
pub struct StepResult {
pub node_id: NodeId,
pub status: StepStatus,
pub output: Option<Value>,
pub error: Option<AgentError>,
pub duration: Duration,
pub retries: u32, // 重试了几次
pub fallback_from: Option<NodeId>, // 从哪个Agent降级来的
}
#[derive(Debug, Clone, PartialEq)]
pub enum StepStatus {
Success,
Failed,
Skipped,
Retried, // 重试后成功
Fallback, // 降级后成功
}
5种状态,精确描述每一步的结局。
十、策略速查表
| 场景 |
策略 |
参数 |
| LLM 偶发超时 |
指数退避 |
3次, 100ms起步 |
| API 限流 429 |
退避 + jitter |
5次, 1s起步 |
| 某个模型挂了 |
熔断 + 降级 |
3次失败熔断, 30s冷却 |
| 全部挂了 |
本地兜底 |
字典翻译/缓存结果 |
十一、总结
- 错误分类:retryable 决定能不能重试
- 指数退避:100ms → 200ms → 400ms,封顶 10s
- 熔断器:连续 3 次失败就熔断,30s 后半开探测
- 降级链:主力 → 备选 → 兜底,总有一个能用
- StepResult:5种状态,重试/降级信息可追溯
六篇下来,核心骨架已经完备:
- 01-03:画布 + 节点 + 连线(数据模型)
- 04:Agent Trait(执行单元)
- 05:Executor(调度引擎)
- 06:错误处理与重试(容错机制)✅ 本篇
下一篇我们聊 07 | 异步执行:tokio + 并行节点:当多个节点没有依赖关系时,如何并行跑起来。