找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

5271

积分

0

好友

727

主题
发表于 3 小时前 | 查看: 4| 回复: 0

单 Agent 能跑通 Demo,Multi-Agent 才能承载生产。真正的分水岭不是 Prompt 写得多漂亮,而是能否把智能体能力拆成可治理、可扩展、可观测、可灰度的工程系统。

版本与适用范围

本文面向 Java/Spring 技术栈,重点讨论如何基于 Spring AI、Spring AI Alibaba、A2A、Nacos、Redis、Kafka 与 Kubernetes 构建生产级 Multi-Agent 系统。

示例代码以 Java 21、Spring Boot 3.x、Spring AI 1.x API 风格为主,A2A 与 Nacos 部分参考 Spring AI Alibaba 1.0.0.4+ 及 Spring AI A2A 社区实现。由于 Spring AI 生态迭代较快,生产落地时应通过 BOM 锁定依赖版本,并以官方文档中的当前 API 为准。

一、为什么单 Agent 到生产一定会遇到天花板

很多团队做 AI 应用的第一版,通常是这样的:

一个超级系统提示词 + 一堆工具 + 一个 ChatClient = 一个看起来什么都会的 Agent

这个方案在原型阶段很快,但一旦进入电商客服、金融投顾、企业知识库、售后工单、运维诊断这类真实场景,就会暴露出结构性问题。

1.1 单 Agent 的四个核心问题

第一,认知负载过高。
一个 Agent 同时负责意图识别、订单查询、库存判断、优惠计算、退款审核、物流追踪和人工升级,模型需要在大量工具和规则之间做选择。工具越多,选择越不稳定,Prompt 越长,推理噪声越大。

第二,上下文污染严重。
订单问题、商品问题、售后问题、闲聊内容混在同一个上下文窗口里,模型很容易把历史信息误用于当前任务。例如用户上一轮问过 A 商品库存,下一轮查询 B 商品退款,模型可能错误复用 A 商品上下文。

第三,工程扩展性差。
单 Agent 无法按能力独立扩容。大促期间订单查询暴涨,本应只扩容订单 Agent,却不得不扩容整个 AI 服务,导致商品咨询、售后问答、闲聊能力也被动扩容,成本不可控。

第四,治理边界不清。
在企业里,订单团队、库存团队、售后团队、风控团队通常是不同组织。单 Agent 模式会把所有业务规则、工具定义和 Prompt 堆在一起,既难以审计,也难以灰度发布。

1.2 Multi-Agent 的本质不是"多个机器人聊天"

生产级 Multi-Agent 不是让几个大模型互相对话,而是把智能能力拆成多个具备明确职责、工具权限、上下文边界和治理策略的执行单元。

一个典型智能客服系统可以拆成:

Agent 职责 可访问工具 扩容特征
Intent Agent 意图识别、置信度评估 意图分类模型、规则库 CPU 轻、低延迟
Order Agent 订单查询、取消、退款前置判断 订单服务、支付服务 大促高峰明显
Product Agent 商品咨询、库存、推荐 商品服务、库存服务、向量检索 搜索链路重
AfterSale Agent 物流、退换货、纠纷处理 物流服务、售后系统 长链路、强规则
Risk Agent 风控校验、敏感操作拦截 风控规则、黑名单、审计系统 强一致、低容错
Human Handoff Agent 人工升级、工单生成 CRM、工单系统 业务兜底

Multi-Agent 的工程价值在于:

  • 职责单一:每个 Agent 只处理一个业务域,Prompt 更短,工具更少,决策更稳定。
  • 独立扩容:订单 Agent、售后 Agent、商品 Agent 可以按流量特征独立伸缩。
  • 故障隔离:库存工具异常不应拖垮退款链路,推荐服务超时不应影响订单查询。
  • 独立治理:不同团队可以独立维护自己的 Agent、工具、Prompt 和评估集。
  • 安全可控:高风险能力可以挂接 Risk Agent 与人工确认,不把关键操作完全交给模型。

二、生产级 Multi-Agent 总体架构

2.1 分层架构

这套架构有一个重要原则:LLM 只负责需要推理的部分,不负责系统治理。

也就是说,权限、限流、熔断、幂等、审计、灰度、租户隔离、预算控制、结果校验,都应该由工程系统兜住,而不是写进 Prompt 后祈祷模型遵守。

2.2 核心组件职责

组件 职责 关键设计点
Orchestrator 接收请求、识别意图、路由 Agent、聚合结果 超时预算、并发执行、降级策略
Agent Registry 管理本地与远程 Agent 元数据 能力标签、版本、健康状态、灰度权重
Context Manager 管理会话、任务、链路上下文 Redis 持久化、上下文压缩、租户隔离
Tool Registry 管理工具定义与执行入口 动态工具发现、权限控制、审计
Guardrail 输入输出安全与业务风控 敏感词、PII、越权、人工确认
Event Bus 异步任务和领域事件 Kafka 解耦、削峰、最终一致
Observability 可观测与成本治理 traceId、token、latency、tool error

2.3 技术选型建议

场景 推荐方案
快速原型 Spring AI + ChatClient + 本地工具
中小规模生产 Spring AI + Redis ChatMemory/Context + Resilience4j + Micrometer
多团队协作 Spring AI Alibaba + Nacos A2A Registry + 独立 Agent 服务
高并发场景 Java 21 虚拟线程 + Reactor 流式输出 + Redis 限流 + K8s HPA
企业级治理 A2A + MCP + Kafka + OpenTelemetry + 评估集 + 灰度平台

三、领域建模:先把 Agent 当成工程组件,而不是 Prompt

3.1 Agent 元数据模型

生产环境中,一个 Agent 至少要描述这些信息:

package com.example.agent.core;

import java.time.Duration;
import java.util.Set;

public record AgentDescriptor(
        String name,
        String version,
        String description,
        Set<String> capabilities,
        Set<String> allowedToolNames,
        Duration timeout,
        int maxConcurrency,
        boolean streamingSupported,
        AgentStatus status
) {
    public boolean canHandle(String capability) {
        return status == AgentStatus.UP && capabilities.contains(capability);
    }
}

enum AgentStatus {
    UP, DEGRADED, DOWN
}

这里的重点不是字段本身,而是架构思想:Agent 必须像微服务一样被描述、注册、发现、限流、熔断和观测。

3.2 统一执行接口

package com.example.agent.core;

import reactor.core.publisher.Flux;

public interface Agent {

    AgentDescriptor descriptor();

    AgentResult execute(AgentRequest request);

    default Flux<AgentStreamChunk> stream(AgentRequest request) {
        return Flux.just(AgentStreamChunk.done(execute(request).answer()));
    }
}
package com.example.agent.core;

import java.time.Instant;
import java.util.Map;

public record AgentRequest(
        String requestId,
        String tenantId,
        String userId,
        String sessionId,
        String input,
        AgentContext context,
        Map<String, Object> attributes,
        Instant deadline
) {
    public boolean expired() {
        return Instant.now().isAfter(deadline);
    }
}

public record AgentResult(
        String agentName,
        String answer,
        double confidence,
        boolean requiresHuman,
        Map<String, Object> facts
) {
    public static AgentResult humanRequired(String agentName, String reason) {
        return new AgentResult(agentName, reason, 0.0, true, Map.of("reason", reason));
    }
}

public record AgentStreamChunk(String content, boolean done) {
    public static AgentStreamChunk chunk(String content) {
        return new AgentStreamChunk(content, false);
    }

    public static AgentStreamChunk done(String content) {
        return new AgentStreamChunk(content, true);
    }
}

统一接口的好处是,Orchestrator 不需要关心底层是本地 Agent、远程 A2A Agent、Graph 工作流,还是一个普通 Java 服务。

四、Skills 与 Tool Calling:从"能调用"升级为"可治理"

Spring AI 的 Tool Calling 把 Java 方法或函数暴露给模型,让模型在需要时调用业务系统。官方抽象里,工具最终会被建模为 ToolCallback,可以通过 @Tool、函数式 Bean 或自定义 ToolCallback 提供。

但生产环境不能只停留在 @Tool。一个可上线的 Skill 至少要具备:

  • 元数据:名称、版本、负责人、风险等级、所属业务域。
  • 权限:哪些 Agent、租户、用户角色可以调用。
  • 治理:超时、重试、熔断、限流、降级。
  • 审计:谁在什么上下文里调用了什么工具,参数和结果是否脱敏。
  • 幂等:创建工单、取消订单、发起退款等写操作必须可重复提交。

4.1 Skill 元数据

package com.example.agent.skill;

import java.time.Duration;
import java.util.Set;

public record SkillMetadata(
        String name,
        String version,
        String owner,
        String domain,
        SkillRiskLevel riskLevel,
        Set<String> allowedAgents,
        Duration timeout,
        int requestsPerSecond,
        boolean returnDirect
) {
}

enum SkillRiskLevel {
    LOW, MEDIUM, HIGH
}

4.2 生产级库存查询 Skill

下面的代码展示一个更接近生产的工具实现:包含参数校验、限流、超时、指标、审计与异常转换。

package com.example.agent.skill.inventory;

import com.example.agent.skill.SkillMetadata;
import com.example.agent.skill.SkillRiskLevel;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import jakarta.validation.constraints.NotBlank;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Set;

@Component
public class InventorySkill {

    private static final Logger log = LoggerFactory.getLogger(InventorySkill.class);

    private final InventoryClient inventoryClient;
    private final MeterRegistry meterRegistry;

    public InventorySkill(InventoryClient inventoryClient, MeterRegistry meterRegistry) {
        this.inventoryClient = inventoryClient;
        this.meterRegistry = meterRegistry;
    }

    public SkillMetadata metadata() {
        return new SkillMetadata(
                "inventory_query",
                "1.0.0",
                "inventory-team",
                "product",
                SkillRiskLevel.LOW,
                Set.of("product-agent", "orchestrator-agent"),
                Duration.ofSeconds(2),
                300,
                false
        );
    }

    @Tool(name = "inventory_query", description = "查询指定 SKU 在指定仓库的可售库存。只用于库存咨询,不用于锁库存。")
    @CircuitBreaker(name = "inventoryClient", fallbackMethod = "fallback")
    @RateLimiter(name = "inventoryClient")
    public InventoryResponse queryInventory(
            @ToolParam(description = "商品 SKU 编码,例如 SKU-10086") @NotBlank String sku,
            @ToolParam(description = "仓库编码,可为空;为空时查询全国汇总库存") String warehouseCode
    ) {
        Timer.Sample sample = Timer.start(meterRegistry);
        try {
            InventoryResponse response = inventoryClient.query(sku, warehouseCode);
            meterRegistry.counter("agent.skill.success", "skill", "inventory_query").increment();
            return response;
        } catch (RuntimeException error) {
            meterRegistry.counter("agent.skill.error", "skill", "inventory_query").increment();
            log.warn("Inventory query failed, sku={}, warehouse={}", sku, warehouseCode, error);
            throw error;
        } finally {
            sample.stop(meterRegistry.timer("agent.skill.duration", "skill", "inventory_query"));
        }
    }

    public InventoryResponse fallback(String sku, String warehouseCode, Throwable error) {
        return new InventoryResponse(sku, warehouseCode, -1, "INVENTORY_TEMPORARILY_UNAVAILABLE");
    }
}

public interface InventoryClient {
    InventoryResponse query(String sku, String warehouseCode);
}

public record InventoryResponse(
        String sku,
        String warehouseCode,
        int availableQuantity,
        String status
) {
}

注意:Spring AI 工具方法对参数和返回类型有一定限制。生产中建议把"模型可见的工具定义"和"内部客户端实现"分层:Tool 方法保持简单稳定,HTTP 连接超时、读取超时、线程隔离、熔断限流交给底层客户端和治理组件处理。

4.3 动态工具发现:不要一次性把所有工具塞给模型

当工具数量超过几十个时,如果每次请求都把所有工具定义传给模型,会产生三个问题:

  • Token 成本暴涨。
  • 模型在大量相似工具中选择错误的概率上升。
  • 工具描述泄露过多内部能力,增加安全风险。

更好的做法是按 Agent、意图和权限动态筛选工具,必要时使用 Spring AI 的动态工具搜索模式:模型初始只看到一个"工具搜索工具",需要能力时再搜索并展开相关工具定义。

package com.example.agent.skill;

import org.springframework.ai.tool.ToolCallback;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class GovernedToolRegistry {

    private final Map<String, GovernedTool> tools = new ConcurrentHashMap<>();

    public void register(GovernedTool tool) {
        tools.put(tool.metadata().name(), tool);
    }

    public List<ToolCallback> resolveFor(String agentName, String tenantId, String intent) {
        return tools.values().stream()
                .filter(tool -> tool.metadata().allowedAgents().contains(agentName))
                .filter(tool -> tool.policy().tenantAllowed(tenantId))
                .filter(tool -> tool.policy().intentAllowed(intent))
                .map(GovernedTool::callback)
                .toList();
    }
}

public record GovernedTool(
        SkillMetadata metadata,
        ToolPolicy policy,
        ToolCallback callback
) {
}

public interface ToolPolicy {
    boolean tenantAllowed(String tenantId);

    boolean intentAllowed(String intent);
}

五、上下文工程:让 Agent 记该记的,忘该忘的

Multi-Agent 系统里,上下文管理比单 Agent 更复杂。因为上下文至少分为四类:

上下文类型 示例 生命周期
Request Context requestId、traceId、deadline 单次请求
Session Context 最近对话、用户偏好 一个会话
Task Context 当前任务状态、调用过的 Agent 一个任务
Long-term Memory 用户长期偏好、历史工单摘要 跨会话

5.1 AgentContext 设计

package com.example.agent.context;

import org.springframework.ai.chat.messages.Message;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class AgentContext {

    private final String tenantId;
    private final String sessionId;
    private final String threadId;
    private final List<Message> recentMessages;
    private final Map<String, Object> attributes;
    private final ExecutionTrace trace;
    private final Instant createdAt;

    public AgentContext(String tenantId, String sessionId, String threadId) {
        this.tenantId = tenantId;
        this.sessionId = sessionId;
        this.threadId = threadId;
        this.recentMessages = new ArrayList<>();
        this.attributes = new ConcurrentHashMap<>();
        this.trace = new ExecutionTrace();
        this.createdAt = Instant.now();
    }

    public String tenantId() {
        return tenantId;
    }

    public String sessionId() {
        return sessionId;
    }

    public String threadId() {
        return threadId;
    }

    public List<Message> recentMessages() {
        return recentMessages;
    }

    public ExecutionTrace trace() {
        return trace;
    }

    public AgentContext put(String key, Object value) {
        attributes.put(key, value);
        return this;
    }

    public <T> T get(String key, Class<T> type) {
        Object value = attributes.get(key);
        return value == null ? null : type.cast(value);
    }
}

5.2 Redis 持久化与分布式会话

package com.example.agent.context;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Optional;

@Component
public class RedisAgentContextRepository {

    private static final Duration TTL = Duration.ofHours(12);

    private final StringRedisTemplate redisTemplate;
    private final ObjectMapper objectMapper;

    public RedisAgentContextRepository(StringRedisTemplate redisTemplate, ObjectMapper objectMapper) {
        this.redisTemplate = redisTemplate;
        this.objectMapper = objectMapper;
    }

    public void save(AgentContextSnapshot snapshot) {
        try {
            String key = key(snapshot.tenantId(), snapshot.sessionId());
            String json = objectMapper.writeValueAsString(snapshot);
            redisTemplate.opsForValue().set(key, json, TTL);
        } catch (Exception e) {
            throw new ContextStoreException("Failed to save agent context", e);
        }
    }

    public Optional<AgentContextSnapshot> find(String tenantId, String sessionId) {
        try {
            String json = redisTemplate.opsForValue().get(key(tenantId, sessionId));
            if (json == null) {
                return Optional.empty();
            }
            return Optional.of(objectMapper.readValue(json, AgentContextSnapshot.class));
        } catch (Exception e) {
            throw new ContextStoreException("Failed to restore agent context", e);
        }
    }

    private String key(String tenantId, String sessionId) {
        return "agent:ctx:" + tenantId + ":" + sessionId;
    }
}

public record AgentContextSnapshot(
        String tenantId,
        String sessionId,
        String threadId,
        String summary,
        String recentMessagesJson,
        long updatedAt
) {
}

class ContextStoreException extends RuntimeException {
    ContextStoreException(String message, Throwable cause) {
        super(message, cause);
    }
}

5.3 上下文压缩策略

上下文窗口不是数据库,不能把所有历史都塞进去。推荐采用三段式上下文:

最终传给模型的上下文 = 系统指令 + 历史摘要 + 最近 N 轮对话 + 当前任务事实 + 必要检索片段

工程策略如下:

  • 最近消息窗口:保留最近 6 到 12 轮对话。
  • 摘要记忆:超过窗口的历史生成结构化摘要,而不是原文拼接。
  • 事实记忆:订单号、SKU、物流单号、用户选择等事实单独存储。
  • 语义检索:需要历史知识时,通过向量检索召回相关片段。
  • 敏感信息脱敏:身份证、手机号、银行卡等不应直接进入模型上下文。

六、路由策略:规则优先,模型补充,不要把所有决策交给 LLM

6.1 决策模型

package com.example.agent.routing;

import java.util.List;

public record UserIntent(
        String primaryIntent,
        double confidence,
        List<String> entities,
        boolean riskyOperation
) {
}

public record AgentDecision(
        String targetAgent,
        boolean parallel,
        List<String> collaboratorAgents,
        String reason
) {
    public static AgentDecision routeTo(String agentName, String reason) {
        return new AgentDecision(agentName, false, List.of(), reason);
    }

    public static AgentDecision parallel(List<String> agents, String reason) {
        return new AgentDecision("aggregator-agent", true, agents, reason);
    }
}

6.2 策略链:规则、语义、LLM 三段式

高并发 系统中,所有请求都让 LLM 做路由是不经济的。推荐用策略链:

package com.example.agent.routing;

public interface DecisionStrategy {
    boolean supports(UserIntent intent);

    AgentDecision decide(RoutingContext context, UserIntent intent);
}

public record RoutingContext(
        String tenantId,
        String userId,
        String sessionId,
        String input
) {
}
package com.example.agent.routing;

import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@Order(10)
public class RuleBasedDecisionStrategy implements DecisionStrategy {

    private final Map<String, String> mappings = Map.of(
            "order_query", "order-agent",
            "order_cancel", "order-agent",
            "refund_apply", "after-sale-agent",
            "product_inventory", "product-agent",
            "human_service", "human-handoff-agent"
    );

    @Override
    public boolean supports(UserIntent intent) {
        return intent.confidence() >= 0.85 && mappings.containsKey(intent.primaryIntent());
    }

    @Override
    public AgentDecision decide(RoutingContext context, UserIntent intent) {
        return AgentDecision.routeTo(mappings.get(intent.primaryIntent()),
                "rule matched: " + intent.primaryIntent());
    }
}
package com.example.agent.routing;

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Component
@Order(100)
public class LlmDecisionStrategy implements DecisionStrategy {

    private final ChatClient chatClient;

    public LlmDecisionStrategy(ChatClient.Builder builder) {
        this.chatClient = builder
                .defaultSystem("""
                        你是企业智能客服的路由器,只能输出 JSON。
                        可选 Agent:
                        - order-agent:订单查询、取消订单、支付状态
                        - product-agent:商品咨询、库存、推荐
                        - after-sale-agent:退款、退货、物流异常
                        - human-handoff-agent:投诉、低置信度、高风险操作
                        输出格式:{"targetAgent":"...","reason":"..."}
                        """)
                .build();
    }

    @Override
    public boolean supports(UserIntent intent) {
        return true;
    }

    @Override
    public AgentDecision decide(RoutingContext context, UserIntent intent) {
        String json = chatClient.prompt()
                .user("用户输入:" + context.input() + "\n意图:" + intent.primaryIntent())
                .call()
                .content();

        LlmRoutingResult result = LlmRoutingResultParser.parse(json);
        return AgentDecision.routeTo(result.targetAgent(),
                "llm routed: " + result.reason());
    }
}

record LlmRoutingResult(String targetAgent, String reason) {
}

生产建议:LLM 路由一定要加 JSON Schema、枚举约束、输出解析失败兜底,并记录原始输出用于调试。

七、Orchestrator:Multi-Agent 的核心执行引擎

Orchestrator 要解决的不只是"调用哪个 Agent",还包括:

  • 全链路超时预算。
  • 并行与串行编排。
  • 重试、熔断、降级。
  • 上下文读写。
  • 风控校验。
  • 结果聚合。
  • 审计与事件发布。

7.1 AgentRegistry

package com.example.agent.core;

import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class AgentRegistry {

    private final Map<String, Agent> localAgents = new ConcurrentHashMap<>();

    public AgentRegistry(Collection<Agent> agents) {
        agents.forEach(agent -> localAgents.put(agent.descriptor().name(), agent));
    }

    public Agent getRequired(String name) {
        Agent agent = localAgents.get(name);
        if (agent == null) {
            throw new AgentNotFoundException("Agent not found: " + name);
        }
        return agent;
    }

    public Collection<AgentDescriptor> descriptors() {
        return localAgents.values().stream().map(Agent::descriptor).toList();
    }
}

class AgentNotFoundException extends RuntimeException {
    AgentNotFoundException(String message) {
        super(message);
    }
}

7.2 生产级 Orchestrator

package com.example.agent.orchestration;

import com.example.agent.context.AgentContext;
import com.example.agent.core.Agent;
import com.example.agent.core.AgentRegistry;
import com.example.agent.core.AgentRequest;
import com.example.agent.core.AgentResult;
import com.example.agent.routing.AgentDecision;
import com.example.agent.routing.DecisionStrategy;
import com.example.agent.routing.RoutingContext;
import com.example.agent.routing.UserIntent;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.retry.RetryRegistry;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

@Service
public class AgentOrchestrator {

    private static final Duration REQUEST_BUDGET = Duration.ofSeconds(12);

    private final AgentRegistry agentRegistry;
    private final IntentService intentService;
    private final List<DecisionStrategy> strategies;
    private final ExecutorService agentExecutor;
    private final MeterRegistry meterRegistry;
    private final RetryRegistry retryRegistry;
    private final CircuitBreakerRegistry circuitBreakerRegistry;

    public AgentOrchestrator(
            AgentRegistry agentRegistry,
            IntentService intentService,
            List<DecisionStrategy> strategies,
            ExecutorService agentExecutor,
            MeterRegistry meterRegistry,
            RetryRegistry retryRegistry,
            CircuitBreakerRegistry circuitBreakerRegistry
    ) {
        this.agentRegistry = agentRegistry;
        this.intentService = intentService;
        this.strategies = strategies.stream()
                .sorted(Comparator.comparingInt(strategy -> strategy.getClass().getAnnotation(org.springframework.core.annotation.Order.class) == null
                        ? Integer.MAX_VALUE
                        : strategy.getClass().getAnnotation(org.springframework.core.annotation.Order.class).value()))
                .toList();
        this.agentExecutor = agentExecutor;
        this.meterRegistry = meterRegistry;
        this.retryRegistry = retryRegistry;
        this.circuitBreakerRegistry = circuitBreakerRegistry;
    }

    public AgentResult orchestrate(OrchestrationCommand command, AgentContext context) {
        Instant deadline = Instant.now().plus(REQUEST_BUDGET);
        String requestId = UUID.randomUUID().toString();

        UserIntent intent = intentService.recognize(command.input(), context);
        context.put("intent", intent);

        AgentDecision decision = decide(command, intent);

        AgentResult result = decision.parallel()
                ? executeParallel(requestId, command, context, decision, deadline)
                : executeSingle(requestId, command, context, decision.targetAgent(), deadline);

        meterRegistry.counter("agent.orchestration.success", "target", decision.targetAgent()).increment();
        return result;
    }

    private AgentDecision decide(OrchestrationCommand command, UserIntent intent) {
        RoutingContext routingContext = new RoutingContext(
                command.tenantId(),
                command.userId(),
                command.sessionId(),
                command.input()
        );

        return strategies.stream()
                .filter(strategy -> strategy.supports(intent))
                .findFirst()
                .orElseThrow(() -> new IllegalStateException("No decision strategy available"))
                .decide(routingContext, intent);
    }

    private AgentResult executeSingle(
            String requestId,
            OrchestrationCommand command,
            AgentContext context,
            String agentName,
            Instant deadline
    ) {
        Agent agent = agentRegistry.getRequired(agentName);
        AgentRequest request = new AgentRequest(
                requestId,
                command.tenantId(),
                command.userId(),
                command.sessionId(),
                command.input(),
                context,
                Map.of(),
                deadline
        );

        return executeGuarded(agent, request);
    }

    private AgentResult executeParallel(
            String requestId,
            OrchestrationCommand command,
            AgentContext context,
            AgentDecision decision,
            Instant deadline
    ) {
        List<CompletableFuture<AgentResult>> futures = decision.collaboratorAgents().stream()
                .map(agentName -> CompletableFuture.supplyAsync(
                        () -> executeSingle(requestId, command, context, agentName, deadline),
                        agentExecutor
                ))
                .toList();

        List<AgentResult> results = futures.stream().map(CompletableFuture::join).toList();
        return AgentResultAggregator.aggregate(results);
    }

    private AgentResult executeGuarded(Agent agent, AgentRequest request) {
        String name = agent.descriptor().name();
        var retry = retryRegistry.retry(name);
        var circuitBreaker = circuitBreakerRegistry.circuitBreaker(name);

        return io.github.resilience4j.decorators.Decorators
                .ofSupplier(() -> agent.execute(request))
                .withCircuitBreaker(circuitBreaker)
                .withRetry(retry)
                .get();
    }
}

public record OrchestrationCommand(
        String tenantId,
        String userId,
        String sessionId,
        String input
) {
}

这段代码的核心点:

  • 所有请求都有 deadline,避免无限等待。
  • Agent 执行统一经过熔断和重试。
  • 并行 Agent 通过虚拟线程 Executor 执行。
  • Orchestrator 不直接依赖具体业务工具,业务能力封装在 Agent 内部。

八、一个完整业务案例:大促客服"查库存 + 查优惠 + 推荐替代品"

用户输入:

我想买 SKU-10086,上海还有货吗?如果没货,帮我推荐一个差不多的,最好能参加满减。

8.1 执行链路

8.2 Product Agent 示例

package com.example.agent.product;

import com.example.agent.core.Agent;
import com.example.agent.core.AgentDescriptor;
import com.example.agent.core.AgentRequest;
import com.example.agent.core.AgentResult;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;
import java.util.Set;

@Component
public class ProductAgent implements Agent {

    private final ChatClient chatClient;
    private final ProductTools productTools;

    public ProductAgent(ChatClient.Builder builder, ProductTools productTools) {
        this.productTools = productTools;
        this.chatClient = builder
                .defaultSystem("""
                        你是电商商品咨询专家。
                        约束:
                        1. 库存、价格、优惠必须来自工具结果,不能猜测。
                        2. 如果原商品无货,需要基于推荐工具给出替代品。
                        3. 输出要简洁,先给结论,再给推荐理由。
                        4. 不要承诺下单成功,只能说明当前查询结果。
                        """)
                .defaultTools(productTools)
                .build();
    }

    @Override
    public AgentDescriptor descriptor() {
        return new AgentDescriptor(
                "product-agent",
                "1.0.0",
                "商品咨询、库存查询、相似商品推荐",
                Set.of("product_inventory", "product_recommendation", "promotion_query"),
                Set.of("inventory_query", "recommend_similar_product", "promotion_query"),
                Duration.ofSeconds(5),
                500,
                true,
                com.example.agent.core.AgentStatus.UP
        );
    }

    @Override
    public AgentResult execute(AgentRequest request) {
        String answer = chatClient.prompt()
                .user(request.input())
                .call()
                .content();

        return new AgentResult(
                descriptor().name(),
                answer,
                0.92,
                false,
                Map.of("domain", "product")
        );
    }
}

8.3 ProductTools 示例

package com.example.agent.product;

import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class ProductTools {

    private final InventoryFacade inventoryFacade;
    private final RecommendationFacade recommendationFacade;
    private final PromotionFacade promotionFacade;

    public ProductTools(
            InventoryFacade inventoryFacade,
            RecommendationFacade recommendationFacade,
            PromotionFacade promotionFacade
    ) {
        this.inventoryFacade = inventoryFacade;
        this.recommendationFacade = recommendationFacade;
        this.promotionFacade = promotionFacade;
    }

    @Tool(name = "inventory_query", description = "查询指定 SKU 在城市维度的可售库存")
    public InventoryView queryInventory(
            @ToolParam(description = "SKU 编码") String sku,
            @ToolParam(description = "城市,例如 上海") String city
    ) {
        return inventoryFacade.query(sku, city);
    }

    @Tool(name = "recommend_similar_product", description = "当目标商品无货时,推荐相似商品")
    public List<ProductView> recommendSimilarProduct(
            @ToolParam(description = "原始 SKU 编码") String sku,
            @ToolParam(description = "最多返回数量") int limit
    ) {
        return recommendationFacade.similar(sku, Math.min(limit, 5));
    }

    @Tool(name = "promotion_query", description = "查询 SKU 是否参与当前促销活动")
    public PromotionView queryPromotion(
            @ToolParam(description = "SKU 编码") String sku,
            @ToolParam(description = "用户 ID") String userId
    ) {
        return promotionFacade.query(sku, userId);
    }
}

public record InventoryView(String sku, String city, int availableQuantity, String status) {
}

public record ProductView(String sku, String name, String sellingPoint, long priceInCent) {
}

public record PromotionView(String sku, boolean eligible, String description) {
}

九、高并发工程化:让 Agent 系统扛住真实流量

9.1 并发模型:虚拟线程适合 LLM I/O 密集场景

LLM 调用、向量检索、库存查询、物流查询大多是网络 I/O。Java 21 虚拟线程能显著降低阻塞等待时的线程成本。

package com.example.agent.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
public class AgentConcurrencyConfig {

    @Bean(destroyMethod = "close")
    public ExecutorService agentExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }
}

使用虚拟线程不等于可以无限并发。仍然必须加:

  • LLM Provider 维度限流。
  • Agent 维度最大并发。
  • Tool 维度限流。
  • 租户维度配额。
  • 请求级 deadline。

9.2 Resilience4j 配置

resilience4j:
  circuitbreaker:
    instances:
      product-agent:
        slidingWindowSize: 100
        failureRateThreshold: 50
        slowCallRateThreshold: 60
        slowCallDurationThreshold: 3s
        waitDurationInOpenState: 10s
        permittedNumberOfCallsInHalfOpenState: 10
      inventoryClient:
        slidingWindowSize: 200
        failureRateThreshold: 40
        waitDurationInOpenState: 5s

  retry:
    instances:
      product-agent:
        maxAttempts: 2
        waitDuration: 200ms
        retryExceptions:
          - java.net.SocketTimeoutException
          - org.springframework.web.client.ResourceAccessException

  ratelimiter:
    instances:
      inventoryClient:
        limitForPeriod: 300
        limitRefreshPeriod: 1s
        timeoutDuration: 50ms

9.3 流式输出

对用户体验来说,首 Token 延迟比总耗时更敏感。长答案应优先使用 SSE 或 WebSocket。

package com.example.agent.api;

import com.example.agent.context.AgentContextService;
import com.example.agent.core.AgentStreamChunk;
import com.example.agent.orchestration.StreamingAgentOrchestrator;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class AgentChatController {

    private final AgentContextService contextService;
    private final StreamingAgentOrchestrator orchestrator;

    public AgentChatController(AgentContextService contextService, StreamingAgentOrchestrator orchestrator) {
        this.contextService = contextService;
        this.orchestrator = orchestrator;
    }

    @GetMapping(value = "/api/agent/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<AgentStreamChunk> stream(
            @RequestParam String tenantId,
            @RequestParam String userId,
            @RequestParam String sessionId,
            @RequestParam String message
    ) {
        return contextService.getOrCreate(tenantId, sessionId)
                .flatMapMany(context -> orchestrator.stream(tenantId, userId, sessionId, message, context))
                .onErrorResume(error -> Flux.just(AgentStreamChunk.done("当前服务繁忙,请稍后再试。")));
    }
}

9.4 削峰与异步任务

不是所有 Agent 调用都应该同步完成。比如:

  • 生成复杂售后方案。
  • 分析长文档。
  • 创建人工工单。
  • 发送补偿优惠券。
  • 进行批量订单诊断。

这些任务应通过 Kafka 异步化:

package com.example.agent.event;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class AgentEventPublisher {

    private final KafkaTemplate<String, AgentDomainEvent> kafkaTemplate;

    public AgentEventPublisher(KafkaTemplate<String, AgentDomainEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publish(AgentDomainEvent event) {
        kafkaTemplate.send("agent-domain-events", event.sessionId(), event);
    }
}

public record AgentDomainEvent(
        String eventId,
        String tenantId,
        String sessionId,
        String type,
        String payload
) {
}

十、从单体 Agent 到分布式 Agent:A2A + Nacos

当 Agent 数量变多、团队边界变清晰后,建议将 Agent 服务化:

orchestrator-service
product-agent-service
order-agent-service
after-sale-agent-service
risk-agent-service

10.1 为什么需要 A2A

A2A 的价值类似"智能体世界里的服务协议":

  • Agent 可以通过 AgentCard 暴露名称、描述、能力、输入输出模式、URL、版本。
  • 调用方可以基于 AgentCard 发现远程 Agent。
  • 不同语言、不同团队实现的 Agent 可以互操作。
  • Nacos 这类注册中心可以承担 AgentCard 注册、发现和版本管理。

10.2 Nacos A2A Registry 配置示例

spring:
  application:
    name: product-agent-service
  ai:
    alibaba:
      a2a:
        nacos:
          server-addr: ${NACOS_ADDR:127.0.0.1:8848}
          username: ${NACOS_USERNAME:nacos}
          password: ${NACOS_PASSWORD:nacos}
          registry:
            enabled: true
          discovery:
            enabled: true
        server:
          version: 1.0.0
          card:
            name: product-agent
            description: 商品咨询、库存查询、相似商品推荐智能体
            provider:
              name: ecommerce-ai-platform
              organization: mall-ai-team

生产建议:

  • server.card.name 要与 Agent Bean 名称或注册名称保持一致。
  • 每个 Agent 服务独立部署,独立配置版本。
  • 多环境通过 Nacos namespace 隔离。
  • 灰度发布通过版本、标签或网关权重控制。

10.3 远程 Agent 适配器

Orchestrator 不应该感知远程调用细节,可以通过适配器把 A2A Remote Agent 包装成统一 Agent 接口。

package com.example.agent.remote;

import com.example.agent.core.Agent;
import com.example.agent.core.AgentDescriptor;
import com.example.agent.core.AgentRequest;
import com.example.agent.core.AgentResult;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;
import java.util.Set;

@Component
public class RemoteProductAgentAdapter implements Agent {

    private final A2aAgentClient client;

    public RemoteProductAgentAdapter(A2aAgentClient client) {
        this.client = client;
    }

    @Override
    public AgentDescriptor descriptor() {
        return new AgentDescriptor(
                "product-agent",
                "1.0.0",
                "Remote product agent via A2A",
                Set.of("product_inventory", "product_recommendation"),
                Set.of(),
                Duration.ofSeconds(6),
                500,
                true,
                com.example.agent.core.AgentStatus.UP
        );
    }

    @Override
    public AgentResult execute(AgentRequest request) {
        RemoteAgentResponse response = client.send("product-agent", request.input(), request.context().threadId());
        return new AgentResult(
                descriptor().name(),
                response.content(),
                response.confidence(),
                response.requiresHuman(),
                Map.of("remote", true)
        );
    }
}

public interface A2aAgentClient {
    RemoteAgentResponse send(String agentName, String input, String threadId);
}

public record RemoteAgentResponse(String content, double confidence, boolean requiresHuman) {
}

十一、Kubernetes 部署与弹性伸缩

11.1 Agent 独立部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: product-agent
spec:
  replicas: 3
  selector:
    matchLabels:
      app: product-agent
  template:
    metadata:
      labels:
        app: product-agent
    spec:
      containers:
        - name: product-agent
          image: registry.example.com/ai/product-agent:1.0.0
          ports:
            - containerPort: 8080
          env:
            - name: NACOS_ADDR
              value: nacos.default.svc.cluster.local:8848
            - name: SPRING_DATA_REDIS_HOST
              value: redis-master.default.svc.cluster.local
            - name: JAVA_TOOL_OPTIONS
              value: "-XX:MaxRAMPercentage=75 -XX:+UseG1GC"
          resources:
            requests:
              cpu: "500m"
              memory: "512Mi"
            limits:
              cpu: "2"
              memory: "2Gi"
          readinessProbe:
            httpGet:
              path: /actuator/health/readiness
              port: 8080
            initialDelaySeconds: 15
            periodSeconds: 5
          livenessProbe:
            httpGet:
              path: /actuator/health/liveness
              port: 8080
            initialDelaySeconds: 30
            periodSeconds: 10

11.2 HPA

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: product-agent-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: product-agent
  minReplicas: 3
  maxReplicas: 30
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 65
    - type: Pods
      pods:
        metric:
          name: agent_inflight_requests
        target:
          type: AverageValue
          averageValue: "80"

生产中建议不要只按 CPU 扩容。AI Agent 往往瓶颈在外部 LLM API、工具服务、向量库或连接池,应该增加自定义指标:

  • agent_inflight_requests
  • agent_request_latency_p95
  • llm_provider_rate_limit_errors
  • tool_call_latency_p95
  • redis_context_latency_p95

十二、安全、风控与人机协同

12.1 高风险操作不要直接执行

以下操作应进入 Risk Agent 或 Human-in-the-loop:

  • 取消订单。
  • 发起退款。
  • 修改地址。
  • 发放优惠券。
  • 查询敏感个人信息。
  • 执行运维变更。

推荐模式:

12.2 操作型工具必须幂等

package com.example.agent.skill.order;

import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;

@Component
public class OrderActionTools {

    private final OrderCommandService orderCommandService;

    public OrderActionTools(OrderCommandService orderCommandService) {
        this.orderCommandService = orderCommandService;
    }

    @Tool(name = "cancel_order", description = "取消未支付订单。执行前必须已经获得用户明确确认。")
    public CancelOrderResult cancelOrder(
            @ToolParam(description = "订单号") String orderNo,
            @ToolParam(description = "幂等键,由 requestId + orderNo 生成") String idempotencyKey,
            @ToolParam(description = "用户确认文本") String confirmationText
    ) {
        if (!confirmationText.contains("确认取消")) {
            return new CancelOrderResult(orderNo, false, "缺少用户明确确认,未执行取消");
        }
        return orderCommandService.cancel(orderNo, idempotencyKey);
    }
}

public interface OrderCommandService {
    CancelOrderResult cancel(String orderNo, String idempotencyKey);
}

public record CancelOrderResult(String orderNo, boolean success, String message) {
}

十三、可观测性与成本治理

没有可观测性的 Agent 系统,线上问题会非常难排查。至少要打通四类指标。

13.1 链路追踪

每次请求都要贯穿:

traceId -> sessionId -> orchestrationId -> agentName -> toolName -> llmRequestId

13.2 指标体系

指标 维度 作用
agent.request.count agent、tenant、intent 流量分析
agent.request.latency agent、p50/p95/p99 性能瓶颈
agent.tool.error tool、exception 工具稳定性
agent.llm.tokens model、agent、tenant 成本核算
agent.route.confidence strategy、intent 路由质量
agent.handoff.count tenant、intent 人工兜底比例

13.3 Token 预算控制

package com.example.agent.cost;

import org.springframework.stereotype.Component;

@Component
public class TokenBudgetGuard {

    private final TenantBudgetService budgetService;

    public TokenBudgetGuard(TenantBudgetService budgetService) {
        this.budgetService = budgetService;
    }

    public void check(String tenantId, String agentName, int estimatedTokens) {
        TokenBudget budget = budgetService.currentBudget(tenantId);
        if (budget.remainingTokens() < estimatedTokens) {
            throw new TokenBudgetExceededException("Tenant token budget exceeded: " + tenantId);
        }
    }
}

public interface TenantBudgetService {
    TokenBudget currentBudget(String tenantId);
}

public record TokenBudget(long remainingTokens) {
}

class TokenBudgetExceededException extends RuntimeException {
    TokenBudgetExceededException(String message) {
        super(message);
    }
}

预算不足时的降级策略:

  • 优先使用规则回复。
  • 使用小模型做意图识别。
  • 禁用非必要推荐 Agent。
  • 缩短上下文窗口。
  • 引导人工客服。

十四、常见线上问题与解决方案

14.1 Token 消耗突然暴涨

常见原因:

  • 新增工具后所有 Agent 默认加载全部工具。
  • 历史上下文没有压缩。
  • RAG 召回片段过多。
  • 模型路由失败导致多 Agent 重复调用。

解决方案:

  • 工具按 Agent 和意图动态筛选。
  • 启用工具搜索模式,按需展开工具定义。
  • 设置单请求 Token 上限。
  • 对 RAG 片段做 rerank 和长度裁剪。
  • 按 Agent、租户、模型记录 token 指标。

14.2 Agent 间相互调用导致循环

解决方案:

  • AgentRequest 中记录 visitedAgents。
  • 限制最大调用深度,例如不超过 3 层。
  • Orchestrator 统一调度,禁止业务 Agent 私自无限递归调用。
  • 对循环调用做 trace 告警。

14.3 LLM 选择了错误工具

解决方案:

  • 工具名称要明确,避免 querysearch 这类模糊命名。
  • 工具描述要写清楚适用场景和禁止场景。
  • 对高风险工具增加规则前置判断。
  • 使用工具参数增强记录模型调用理由。
  • 对工具调用结果进行业务校验。

14.4 流式接口中途失败

解决方案:

  • SSE 事件增加 event: errorevent: done
  • 前端支持断线重试和会话恢复。
  • 后端按 chunk 持久化关键状态。
  • 对最终答案生成失败的场景返回事实型兜底答案。

14.5 Redis 上下文热点

解决方案:

  • session key 加租户前缀并合理分片。
  • 大对象拆分存储,摘要和最近消息分离。
  • 热点会话本地短 TTL 缓存,但关键状态仍以 Redis 为准。
  • 避免每个 token chunk 都写 Redis,改为阶段性 checkpoint。

十五、推荐落地路径

阶段一:单应用内模块化

适合 0 到 1 验证:

  • 一个 Spring Boot 应用。
  • 多个本地 Agent Bean。
  • 每个 Agent 绑定有限工具。
  • Redis 保存上下文。
  • Micrometer 记录基本指标。

阶段二:核心 Agent 服务化

适合业务量开始增长:

  • Orchestrator 独立服务。
  • 高频 Agent 独立部署。
  • Nacos 注册发现。
  • Kafka 处理异步任务。
  • K8s 独立扩容。

阶段三:企业级 Agent 平台

适合多团队、多业务线:

  • A2A 统一 Agent 互操作。
  • MCP 标准化外部工具接入。
  • Agent 评估集与回归测试。
  • Prompt、工具、模型版本统一治理。
  • 成本预算与审计平台化。

十六、上线检查清单

上线前至少确认:

  • 每个 Agent 有明确职责、版本、owner 和能力标签。
  • 每个工具有权限控制、超时、限流、熔断和审计。
  • 写操作工具具备幂等键和人工确认机制。
  • 上下文有租户隔离、压缩策略和敏感信息脱敏。
  • Orchestrator 有全链路 deadline 和降级策略。
  • LLM 输出解析失败时有兜底。
  • Agent 调用链有 traceId,能定位到具体工具和模型请求。
  • token 成本按租户、Agent、模型可统计。
  • 关键 Agent 有评估集和灰度发布机制。
  • K8s 探针、HPA、限流和告警已配置。

十七、总结

把大模型当成"万能工具人",本质上是在用 Prompt 承担架构职责。这个思路可以做 Demo,却很难支撑生产。

生产级 Multi-Agent 的核心不是让模型更"聪明",而是让系统更"可控":

  • 用 Agent 拆业务边界,降低单点认知负载。
  • 用 Skills/Tools 承载能力边界,并加入治理能力。
  • 用 Orchestrator 管理路由、并发、超时、降级和结果聚合。
  • 用 Redis、Kafka、Nacos、Kubernetes 把智能体放进成熟的分布式工程体系。
  • 用可观测性、评估集和成本预算,让 AI 系统可以持续运营。

Spring AI 的价值在于把 AI 能力带回 Java 工程体系;Spring AI Alibaba、A2A、MCP、Nacos 则进一步把 Agent 从单应用组件推向分布式协作单元。对于 Java 团队来说,Multi-Agent 的正确打开方式不是重新发明一套 AI 平台,而是把 Agent 当成新型微服务,用成熟的软件工程方法治理它。

参考资料

  • Spring AI Tool Calling Reference
  • Spring AI Dynamic Tool Discovery
  • Spring AI Alibaba A2A Agent 文档
  • Nacos A2A Registry 文档
  • Spring AI A2A Integration Blog



上一篇:AI时代招人新标准:学历贬值,解题力不如命题力
下一篇:谷歌第八代TPU拆成两颗:推训分离硬刚英伟达,但更可怕的还是电
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-25 11:20 , Processed in 0.644820 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表