单 Agent 能跑通 Demo,Multi-Agent 才能承载生产。真正的分水岭不是 Prompt 写得多漂亮,而是能否把智能体能力拆成可治理、可扩展、可观测、可灰度的工程系统。
版本与适用范围
本文面向 Java/Spring 技术栈,重点讨论如何基于 Spring AI、Spring AI Alibaba、A2A、Nacos、Redis、Kafka 与 Kubernetes 构建生产级 Multi-Agent 系统。
示例代码以 Java 21、Spring Boot 3.x、Spring AI 1.x API 风格为主,A2A 与 Nacos 部分参考 Spring AI Alibaba 1.0.0.4+ 及 Spring AI A2A 社区实现。由于 Spring AI 生态迭代较快,生产落地时应通过 BOM 锁定依赖版本,并以官方文档中的当前 API 为准。
一、为什么单 Agent 到生产一定会遇到天花板
很多团队做 AI 应用的第一版,通常是这样的:
一个超级系统提示词 + 一堆工具 + 一个 ChatClient = 一个看起来什么都会的 Agent
这个方案在原型阶段很快,但一旦进入电商客服、金融投顾、企业知识库、售后工单、运维诊断这类真实场景,就会暴露出结构性问题。
1.1 单 Agent 的四个核心问题
第一,认知负载过高。
一个 Agent 同时负责意图识别、订单查询、库存判断、优惠计算、退款审核、物流追踪和人工升级,模型需要在大量工具和规则之间做选择。工具越多,选择越不稳定,Prompt 越长,推理噪声越大。
第二,上下文污染严重。
订单问题、商品问题、售后问题、闲聊内容混在同一个上下文窗口里,模型很容易把历史信息误用于当前任务。例如用户上一轮问过 A 商品库存,下一轮查询 B 商品退款,模型可能错误复用 A 商品上下文。
第三,工程扩展性差。
单 Agent 无法按能力独立扩容。大促期间订单查询暴涨,本应只扩容订单 Agent,却不得不扩容整个 AI 服务,导致商品咨询、售后问答、闲聊能力也被动扩容,成本不可控。
第四,治理边界不清。
在企业里,订单团队、库存团队、售后团队、风控团队通常是不同组织。单 Agent 模式会把所有业务规则、工具定义和 Prompt 堆在一起,既难以审计,也难以灰度发布。
1.2 Multi-Agent 的本质不是"多个机器人聊天"
生产级 Multi-Agent 不是让几个大模型互相对话,而是把智能能力拆成多个具备明确职责、工具权限、上下文边界和治理策略的执行单元。
一个典型智能客服系统可以拆成:
| Agent |
职责 |
可访问工具 |
扩容特征 |
| Intent Agent |
意图识别、置信度评估 |
意图分类模型、规则库 |
CPU 轻、低延迟 |
| Order Agent |
订单查询、取消、退款前置判断 |
订单服务、支付服务 |
大促高峰明显 |
| Product Agent |
商品咨询、库存、推荐 |
商品服务、库存服务、向量检索 |
搜索链路重 |
| AfterSale Agent |
物流、退换货、纠纷处理 |
物流服务、售后系统 |
长链路、强规则 |
| Risk Agent |
风控校验、敏感操作拦截 |
风控规则、黑名单、审计系统 |
强一致、低容错 |
| Human Handoff Agent |
人工升级、工单生成 |
CRM、工单系统 |
业务兜底 |
Multi-Agent 的工程价值在于:
- 职责单一:每个 Agent 只处理一个业务域,Prompt 更短,工具更少,决策更稳定。
- 独立扩容:订单 Agent、售后 Agent、商品 Agent 可以按流量特征独立伸缩。
- 故障隔离:库存工具异常不应拖垮退款链路,推荐服务超时不应影响订单查询。
- 独立治理:不同团队可以独立维护自己的 Agent、工具、Prompt 和评估集。
- 安全可控:高风险能力可以挂接 Risk Agent 与人工确认,不把关键操作完全交给模型。
二、生产级 Multi-Agent 总体架构
2.1 分层架构
这套架构有一个重要原则:LLM 只负责需要推理的部分,不负责系统治理。
也就是说,权限、限流、熔断、幂等、审计、灰度、租户隔离、预算控制、结果校验,都应该由工程系统兜住,而不是写进 Prompt 后祈祷模型遵守。
2.2 核心组件职责
| 组件 |
职责 |
关键设计点 |
| Orchestrator |
接收请求、识别意图、路由 Agent、聚合结果 |
超时预算、并发执行、降级策略 |
| Agent Registry |
管理本地与远程 Agent 元数据 |
能力标签、版本、健康状态、灰度权重 |
| Context Manager |
管理会话、任务、链路上下文 |
Redis 持久化、上下文压缩、租户隔离 |
| Tool Registry |
管理工具定义与执行入口 |
动态工具发现、权限控制、审计 |
| Guardrail |
输入输出安全与业务风控 |
敏感词、PII、越权、人工确认 |
| Event Bus |
异步任务和领域事件 |
Kafka 解耦、削峰、最终一致 |
| Observability |
可观测与成本治理 |
traceId、token、latency、tool error |
2.3 技术选型建议
| 场景 |
推荐方案 |
| 快速原型 |
Spring AI + ChatClient + 本地工具 |
| 中小规模生产 |
Spring AI + Redis ChatMemory/Context + Resilience4j + Micrometer |
| 多团队协作 |
Spring AI Alibaba + Nacos A2A Registry + 独立 Agent 服务 |
| 高并发场景 |
Java 21 虚拟线程 + Reactor 流式输出 + Redis 限流 + K8s HPA |
| 企业级治理 |
A2A + MCP + Kafka + OpenTelemetry + 评估集 + 灰度平台 |
三、领域建模:先把 Agent 当成工程组件,而不是 Prompt
3.1 Agent 元数据模型
生产环境中,一个 Agent 至少要描述这些信息:
package com.example.agent.core;
import java.time.Duration;
import java.util.Set;
public record AgentDescriptor(
String name,
String version,
String description,
Set<String> capabilities,
Set<String> allowedToolNames,
Duration timeout,
int maxConcurrency,
boolean streamingSupported,
AgentStatus status
) {
public boolean canHandle(String capability) {
return status == AgentStatus.UP && capabilities.contains(capability);
}
}
enum AgentStatus {
UP, DEGRADED, DOWN
}
这里的重点不是字段本身,而是架构思想:Agent 必须像微服务一样被描述、注册、发现、限流、熔断和观测。
3.2 统一执行接口
package com.example.agent.core;
import reactor.core.publisher.Flux;
public interface Agent {
AgentDescriptor descriptor();
AgentResult execute(AgentRequest request);
default Flux<AgentStreamChunk> stream(AgentRequest request) {
return Flux.just(AgentStreamChunk.done(execute(request).answer()));
}
}
package com.example.agent.core;
import java.time.Instant;
import java.util.Map;
public record AgentRequest(
String requestId,
String tenantId,
String userId,
String sessionId,
String input,
AgentContext context,
Map<String, Object> attributes,
Instant deadline
) {
public boolean expired() {
return Instant.now().isAfter(deadline);
}
}
public record AgentResult(
String agentName,
String answer,
double confidence,
boolean requiresHuman,
Map<String, Object> facts
) {
public static AgentResult humanRequired(String agentName, String reason) {
return new AgentResult(agentName, reason, 0.0, true, Map.of("reason", reason));
}
}
public record AgentStreamChunk(String content, boolean done) {
public static AgentStreamChunk chunk(String content) {
return new AgentStreamChunk(content, false);
}
public static AgentStreamChunk done(String content) {
return new AgentStreamChunk(content, true);
}
}
统一接口的好处是,Orchestrator 不需要关心底层是本地 Agent、远程 A2A Agent、Graph 工作流,还是一个普通 Java 服务。
Spring AI 的 Tool Calling 把 Java 方法或函数暴露给模型,让模型在需要时调用业务系统。官方抽象里,工具最终会被建模为 ToolCallback,可以通过 @Tool、函数式 Bean 或自定义 ToolCallback 提供。
但生产环境不能只停留在 @Tool。一个可上线的 Skill 至少要具备:
- 元数据:名称、版本、负责人、风险等级、所属业务域。
- 权限:哪些 Agent、租户、用户角色可以调用。
- 治理:超时、重试、熔断、限流、降级。
- 审计:谁在什么上下文里调用了什么工具,参数和结果是否脱敏。
- 幂等:创建工单、取消订单、发起退款等写操作必须可重复提交。
4.1 Skill 元数据
package com.example.agent.skill;
import java.time.Duration;
import java.util.Set;
public record SkillMetadata(
String name,
String version,
String owner,
String domain,
SkillRiskLevel riskLevel,
Set<String> allowedAgents,
Duration timeout,
int requestsPerSecond,
boolean returnDirect
) {
}
enum SkillRiskLevel {
LOW, MEDIUM, HIGH
}
4.2 生产级库存查询 Skill
下面的代码展示一个更接近生产的工具实现:包含参数校验、限流、超时、指标、审计与异常转换。
package com.example.agent.skill.inventory;
import com.example.agent.skill.SkillMetadata;
import com.example.agent.skill.SkillRiskLevel;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.ratelimiter.annotation.RateLimiter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import jakarta.validation.constraints.NotBlank;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Set;
@Component
public class InventorySkill {
private static final Logger log = LoggerFactory.getLogger(InventorySkill.class);
private final InventoryClient inventoryClient;
private final MeterRegistry meterRegistry;
public InventorySkill(InventoryClient inventoryClient, MeterRegistry meterRegistry) {
this.inventoryClient = inventoryClient;
this.meterRegistry = meterRegistry;
}
public SkillMetadata metadata() {
return new SkillMetadata(
"inventory_query",
"1.0.0",
"inventory-team",
"product",
SkillRiskLevel.LOW,
Set.of("product-agent", "orchestrator-agent"),
Duration.ofSeconds(2),
300,
false
);
}
@Tool(name = "inventory_query", description = "查询指定 SKU 在指定仓库的可售库存。只用于库存咨询,不用于锁库存。")
@CircuitBreaker(name = "inventoryClient", fallbackMethod = "fallback")
@RateLimiter(name = "inventoryClient")
public InventoryResponse queryInventory(
@ToolParam(description = "商品 SKU 编码,例如 SKU-10086") @NotBlank String sku,
@ToolParam(description = "仓库编码,可为空;为空时查询全国汇总库存") String warehouseCode
) {
Timer.Sample sample = Timer.start(meterRegistry);
try {
InventoryResponse response = inventoryClient.query(sku, warehouseCode);
meterRegistry.counter("agent.skill.success", "skill", "inventory_query").increment();
return response;
} catch (RuntimeException error) {
meterRegistry.counter("agent.skill.error", "skill", "inventory_query").increment();
log.warn("Inventory query failed, sku={}, warehouse={}", sku, warehouseCode, error);
throw error;
} finally {
sample.stop(meterRegistry.timer("agent.skill.duration", "skill", "inventory_query"));
}
}
public InventoryResponse fallback(String sku, String warehouseCode, Throwable error) {
return new InventoryResponse(sku, warehouseCode, -1, "INVENTORY_TEMPORARILY_UNAVAILABLE");
}
}
public interface InventoryClient {
InventoryResponse query(String sku, String warehouseCode);
}
public record InventoryResponse(
String sku,
String warehouseCode,
int availableQuantity,
String status
) {
}
注意:Spring AI 工具方法对参数和返回类型有一定限制。生产中建议把"模型可见的工具定义"和"内部客户端实现"分层:Tool 方法保持简单稳定,HTTP 连接超时、读取超时、线程隔离、熔断限流交给底层客户端和治理组件处理。
4.3 动态工具发现:不要一次性把所有工具塞给模型
当工具数量超过几十个时,如果每次请求都把所有工具定义传给模型,会产生三个问题:
- Token 成本暴涨。
- 模型在大量相似工具中选择错误的概率上升。
- 工具描述泄露过多内部能力,增加安全风险。
更好的做法是按 Agent、意图和权限动态筛选工具,必要时使用 Spring AI 的动态工具搜索模式:模型初始只看到一个"工具搜索工具",需要能力时再搜索并展开相关工具定义。
package com.example.agent.skill;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class GovernedToolRegistry {
private final Map<String, GovernedTool> tools = new ConcurrentHashMap<>();
public void register(GovernedTool tool) {
tools.put(tool.metadata().name(), tool);
}
public List<ToolCallback> resolveFor(String agentName, String tenantId, String intent) {
return tools.values().stream()
.filter(tool -> tool.metadata().allowedAgents().contains(agentName))
.filter(tool -> tool.policy().tenantAllowed(tenantId))
.filter(tool -> tool.policy().intentAllowed(intent))
.map(GovernedTool::callback)
.toList();
}
}
public record GovernedTool(
SkillMetadata metadata,
ToolPolicy policy,
ToolCallback callback
) {
}
public interface ToolPolicy {
boolean tenantAllowed(String tenantId);
boolean intentAllowed(String intent);
}
五、上下文工程:让 Agent 记该记的,忘该忘的
Multi-Agent 系统里,上下文管理比单 Agent 更复杂。因为上下文至少分为四类:
| 上下文类型 |
示例 |
生命周期 |
| Request Context |
requestId、traceId、deadline |
单次请求 |
| Session Context |
最近对话、用户偏好 |
一个会话 |
| Task Context |
当前任务状态、调用过的 Agent |
一个任务 |
| Long-term Memory |
用户长期偏好、历史工单摘要 |
跨会话 |
5.1 AgentContext 设计
package com.example.agent.context;
import org.springframework.ai.chat.messages.Message;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class AgentContext {
private final String tenantId;
private final String sessionId;
private final String threadId;
private final List<Message> recentMessages;
private final Map<String, Object> attributes;
private final ExecutionTrace trace;
private final Instant createdAt;
public AgentContext(String tenantId, String sessionId, String threadId) {
this.tenantId = tenantId;
this.sessionId = sessionId;
this.threadId = threadId;
this.recentMessages = new ArrayList<>();
this.attributes = new ConcurrentHashMap<>();
this.trace = new ExecutionTrace();
this.createdAt = Instant.now();
}
public String tenantId() {
return tenantId;
}
public String sessionId() {
return sessionId;
}
public String threadId() {
return threadId;
}
public List<Message> recentMessages() {
return recentMessages;
}
public ExecutionTrace trace() {
return trace;
}
public AgentContext put(String key, Object value) {
attributes.put(key, value);
return this;
}
public <T> T get(String key, Class<T> type) {
Object value = attributes.get(key);
return value == null ? null : type.cast(value);
}
}
5.2 Redis 持久化与分布式会话
package com.example.agent.context;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Optional;
@Component
public class RedisAgentContextRepository {
private static final Duration TTL = Duration.ofHours(12);
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
public RedisAgentContextRepository(StringRedisTemplate redisTemplate, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
}
public void save(AgentContextSnapshot snapshot) {
try {
String key = key(snapshot.tenantId(), snapshot.sessionId());
String json = objectMapper.writeValueAsString(snapshot);
redisTemplate.opsForValue().set(key, json, TTL);
} catch (Exception e) {
throw new ContextStoreException("Failed to save agent context", e);
}
}
public Optional<AgentContextSnapshot> find(String tenantId, String sessionId) {
try {
String json = redisTemplate.opsForValue().get(key(tenantId, sessionId));
if (json == null) {
return Optional.empty();
}
return Optional.of(objectMapper.readValue(json, AgentContextSnapshot.class));
} catch (Exception e) {
throw new ContextStoreException("Failed to restore agent context", e);
}
}
private String key(String tenantId, String sessionId) {
return "agent:ctx:" + tenantId + ":" + sessionId;
}
}
public record AgentContextSnapshot(
String tenantId,
String sessionId,
String threadId,
String summary,
String recentMessagesJson,
long updatedAt
) {
}
class ContextStoreException extends RuntimeException {
ContextStoreException(String message, Throwable cause) {
super(message, cause);
}
}
5.3 上下文压缩策略
上下文窗口不是数据库,不能把所有历史都塞进去。推荐采用三段式上下文:
最终传给模型的上下文 = 系统指令 + 历史摘要 + 最近 N 轮对话 + 当前任务事实 + 必要检索片段
工程策略如下:
- 最近消息窗口:保留最近 6 到 12 轮对话。
- 摘要记忆:超过窗口的历史生成结构化摘要,而不是原文拼接。
- 事实记忆:订单号、SKU、物流单号、用户选择等事实单独存储。
- 语义检索:需要历史知识时,通过向量检索召回相关片段。
- 敏感信息脱敏:身份证、手机号、银行卡等不应直接进入模型上下文。
六、路由策略:规则优先,模型补充,不要把所有决策交给 LLM
6.1 决策模型
package com.example.agent.routing;
import java.util.List;
public record UserIntent(
String primaryIntent,
double confidence,
List<String> entities,
boolean riskyOperation
) {
}
public record AgentDecision(
String targetAgent,
boolean parallel,
List<String> collaboratorAgents,
String reason
) {
public static AgentDecision routeTo(String agentName, String reason) {
return new AgentDecision(agentName, false, List.of(), reason);
}
public static AgentDecision parallel(List<String> agents, String reason) {
return new AgentDecision("aggregator-agent", true, agents, reason);
}
}
6.2 策略链:规则、语义、LLM 三段式
高并发 系统中,所有请求都让 LLM 做路由是不经济的。推荐用策略链:
package com.example.agent.routing;
public interface DecisionStrategy {
boolean supports(UserIntent intent);
AgentDecision decide(RoutingContext context, UserIntent intent);
}
public record RoutingContext(
String tenantId,
String userId,
String sessionId,
String input
) {
}
package com.example.agent.routing;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@Order(10)
public class RuleBasedDecisionStrategy implements DecisionStrategy {
private final Map<String, String> mappings = Map.of(
"order_query", "order-agent",
"order_cancel", "order-agent",
"refund_apply", "after-sale-agent",
"product_inventory", "product-agent",
"human_service", "human-handoff-agent"
);
@Override
public boolean supports(UserIntent intent) {
return intent.confidence() >= 0.85 && mappings.containsKey(intent.primaryIntent());
}
@Override
public AgentDecision decide(RoutingContext context, UserIntent intent) {
return AgentDecision.routeTo(mappings.get(intent.primaryIntent()),
"rule matched: " + intent.primaryIntent());
}
}
package com.example.agent.routing;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Component
@Order(100)
public class LlmDecisionStrategy implements DecisionStrategy {
private final ChatClient chatClient;
public LlmDecisionStrategy(ChatClient.Builder builder) {
this.chatClient = builder
.defaultSystem("""
你是企业智能客服的路由器,只能输出 JSON。
可选 Agent:
- order-agent:订单查询、取消订单、支付状态
- product-agent:商品咨询、库存、推荐
- after-sale-agent:退款、退货、物流异常
- human-handoff-agent:投诉、低置信度、高风险操作
输出格式:{"targetAgent":"...","reason":"..."}
""")
.build();
}
@Override
public boolean supports(UserIntent intent) {
return true;
}
@Override
public AgentDecision decide(RoutingContext context, UserIntent intent) {
String json = chatClient.prompt()
.user("用户输入:" + context.input() + "\n意图:" + intent.primaryIntent())
.call()
.content();
LlmRoutingResult result = LlmRoutingResultParser.parse(json);
return AgentDecision.routeTo(result.targetAgent(),
"llm routed: " + result.reason());
}
}
record LlmRoutingResult(String targetAgent, String reason) {
}
生产建议:LLM 路由一定要加 JSON Schema、枚举约束、输出解析失败兜底,并记录原始输出用于调试。
七、Orchestrator:Multi-Agent 的核心执行引擎
Orchestrator 要解决的不只是"调用哪个 Agent",还包括:
- 全链路超时预算。
- 并行与串行编排。
- 重试、熔断、降级。
- 上下文读写。
- 风控校验。
- 结果聚合。
- 审计与事件发布。
7.1 AgentRegistry
package com.example.agent.core;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class AgentRegistry {
private final Map<String, Agent> localAgents = new ConcurrentHashMap<>();
public AgentRegistry(Collection<Agent> agents) {
agents.forEach(agent -> localAgents.put(agent.descriptor().name(), agent));
}
public Agent getRequired(String name) {
Agent agent = localAgents.get(name);
if (agent == null) {
throw new AgentNotFoundException("Agent not found: " + name);
}
return agent;
}
public Collection<AgentDescriptor> descriptors() {
return localAgents.values().stream().map(Agent::descriptor).toList();
}
}
class AgentNotFoundException extends RuntimeException {
AgentNotFoundException(String message) {
super(message);
}
}
7.2 生产级 Orchestrator
package com.example.agent.orchestration;
import com.example.agent.context.AgentContext;
import com.example.agent.core.Agent;
import com.example.agent.core.AgentRegistry;
import com.example.agent.core.AgentRequest;
import com.example.agent.core.AgentResult;
import com.example.agent.routing.AgentDecision;
import com.example.agent.routing.DecisionStrategy;
import com.example.agent.routing.RoutingContext;
import com.example.agent.routing.UserIntent;
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.retry.RetryRegistry;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@Service
public class AgentOrchestrator {
private static final Duration REQUEST_BUDGET = Duration.ofSeconds(12);
private final AgentRegistry agentRegistry;
private final IntentService intentService;
private final List<DecisionStrategy> strategies;
private final ExecutorService agentExecutor;
private final MeterRegistry meterRegistry;
private final RetryRegistry retryRegistry;
private final CircuitBreakerRegistry circuitBreakerRegistry;
public AgentOrchestrator(
AgentRegistry agentRegistry,
IntentService intentService,
List<DecisionStrategy> strategies,
ExecutorService agentExecutor,
MeterRegistry meterRegistry,
RetryRegistry retryRegistry,
CircuitBreakerRegistry circuitBreakerRegistry
) {
this.agentRegistry = agentRegistry;
this.intentService = intentService;
this.strategies = strategies.stream()
.sorted(Comparator.comparingInt(strategy -> strategy.getClass().getAnnotation(org.springframework.core.annotation.Order.class) == null
? Integer.MAX_VALUE
: strategy.getClass().getAnnotation(org.springframework.core.annotation.Order.class).value()))
.toList();
this.agentExecutor = agentExecutor;
this.meterRegistry = meterRegistry;
this.retryRegistry = retryRegistry;
this.circuitBreakerRegistry = circuitBreakerRegistry;
}
public AgentResult orchestrate(OrchestrationCommand command, AgentContext context) {
Instant deadline = Instant.now().plus(REQUEST_BUDGET);
String requestId = UUID.randomUUID().toString();
UserIntent intent = intentService.recognize(command.input(), context);
context.put("intent", intent);
AgentDecision decision = decide(command, intent);
AgentResult result = decision.parallel()
? executeParallel(requestId, command, context, decision, deadline)
: executeSingle(requestId, command, context, decision.targetAgent(), deadline);
meterRegistry.counter("agent.orchestration.success", "target", decision.targetAgent()).increment();
return result;
}
private AgentDecision decide(OrchestrationCommand command, UserIntent intent) {
RoutingContext routingContext = new RoutingContext(
command.tenantId(),
command.userId(),
command.sessionId(),
command.input()
);
return strategies.stream()
.filter(strategy -> strategy.supports(intent))
.findFirst()
.orElseThrow(() -> new IllegalStateException("No decision strategy available"))
.decide(routingContext, intent);
}
private AgentResult executeSingle(
String requestId,
OrchestrationCommand command,
AgentContext context,
String agentName,
Instant deadline
) {
Agent agent = agentRegistry.getRequired(agentName);
AgentRequest request = new AgentRequest(
requestId,
command.tenantId(),
command.userId(),
command.sessionId(),
command.input(),
context,
Map.of(),
deadline
);
return executeGuarded(agent, request);
}
private AgentResult executeParallel(
String requestId,
OrchestrationCommand command,
AgentContext context,
AgentDecision decision,
Instant deadline
) {
List<CompletableFuture<AgentResult>> futures = decision.collaboratorAgents().stream()
.map(agentName -> CompletableFuture.supplyAsync(
() -> executeSingle(requestId, command, context, agentName, deadline),
agentExecutor
))
.toList();
List<AgentResult> results = futures.stream().map(CompletableFuture::join).toList();
return AgentResultAggregator.aggregate(results);
}
private AgentResult executeGuarded(Agent agent, AgentRequest request) {
String name = agent.descriptor().name();
var retry = retryRegistry.retry(name);
var circuitBreaker = circuitBreakerRegistry.circuitBreaker(name);
return io.github.resilience4j.decorators.Decorators
.ofSupplier(() -> agent.execute(request))
.withCircuitBreaker(circuitBreaker)
.withRetry(retry)
.get();
}
}
public record OrchestrationCommand(
String tenantId,
String userId,
String sessionId,
String input
) {
}
这段代码的核心点:
- 所有请求都有 deadline,避免无限等待。
- Agent 执行统一经过熔断和重试。
- 并行 Agent 通过虚拟线程 Executor 执行。
- Orchestrator 不直接依赖具体业务工具,业务能力封装在 Agent 内部。
八、一个完整业务案例:大促客服"查库存 + 查优惠 + 推荐替代品"
用户输入:
我想买 SKU-10086,上海还有货吗?如果没货,帮我推荐一个差不多的,最好能参加满减。
8.1 执行链路
8.2 Product Agent 示例
package com.example.agent.product;
import com.example.agent.core.Agent;
import com.example.agent.core.AgentDescriptor;
import com.example.agent.core.AgentRequest;
import com.example.agent.core.AgentResult;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
@Component
public class ProductAgent implements Agent {
private final ChatClient chatClient;
private final ProductTools productTools;
public ProductAgent(ChatClient.Builder builder, ProductTools productTools) {
this.productTools = productTools;
this.chatClient = builder
.defaultSystem("""
你是电商商品咨询专家。
约束:
1. 库存、价格、优惠必须来自工具结果,不能猜测。
2. 如果原商品无货,需要基于推荐工具给出替代品。
3. 输出要简洁,先给结论,再给推荐理由。
4. 不要承诺下单成功,只能说明当前查询结果。
""")
.defaultTools(productTools)
.build();
}
@Override
public AgentDescriptor descriptor() {
return new AgentDescriptor(
"product-agent",
"1.0.0",
"商品咨询、库存查询、相似商品推荐",
Set.of("product_inventory", "product_recommendation", "promotion_query"),
Set.of("inventory_query", "recommend_similar_product", "promotion_query"),
Duration.ofSeconds(5),
500,
true,
com.example.agent.core.AgentStatus.UP
);
}
@Override
public AgentResult execute(AgentRequest request) {
String answer = chatClient.prompt()
.user(request.input())
.call()
.content();
return new AgentResult(
descriptor().name(),
answer,
0.92,
false,
Map.of("domain", "product")
);
}
}
package com.example.agent.product;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class ProductTools {
private final InventoryFacade inventoryFacade;
private final RecommendationFacade recommendationFacade;
private final PromotionFacade promotionFacade;
public ProductTools(
InventoryFacade inventoryFacade,
RecommendationFacade recommendationFacade,
PromotionFacade promotionFacade
) {
this.inventoryFacade = inventoryFacade;
this.recommendationFacade = recommendationFacade;
this.promotionFacade = promotionFacade;
}
@Tool(name = "inventory_query", description = "查询指定 SKU 在城市维度的可售库存")
public InventoryView queryInventory(
@ToolParam(description = "SKU 编码") String sku,
@ToolParam(description = "城市,例如 上海") String city
) {
return inventoryFacade.query(sku, city);
}
@Tool(name = "recommend_similar_product", description = "当目标商品无货时,推荐相似商品")
public List<ProductView> recommendSimilarProduct(
@ToolParam(description = "原始 SKU 编码") String sku,
@ToolParam(description = "最多返回数量") int limit
) {
return recommendationFacade.similar(sku, Math.min(limit, 5));
}
@Tool(name = "promotion_query", description = "查询 SKU 是否参与当前促销活动")
public PromotionView queryPromotion(
@ToolParam(description = "SKU 编码") String sku,
@ToolParam(description = "用户 ID") String userId
) {
return promotionFacade.query(sku, userId);
}
}
public record InventoryView(String sku, String city, int availableQuantity, String status) {
}
public record ProductView(String sku, String name, String sellingPoint, long priceInCent) {
}
public record PromotionView(String sku, boolean eligible, String description) {
}
九、高并发工程化:让 Agent 系统扛住真实流量
9.1 并发模型:虚拟线程适合 LLM I/O 密集场景
LLM 调用、向量检索、库存查询、物流查询大多是网络 I/O。Java 21 虚拟线程能显著降低阻塞等待时的线程成本。
package com.example.agent.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Configuration
public class AgentConcurrencyConfig {
@Bean(destroyMethod = "close")
public ExecutorService agentExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
}
使用虚拟线程不等于可以无限并发。仍然必须加:
- LLM Provider 维度限流。
- Agent 维度最大并发。
- Tool 维度限流。
- 租户维度配额。
- 请求级 deadline。
9.2 Resilience4j 配置
resilience4j:
circuitbreaker:
instances:
product-agent:
slidingWindowSize: 100
failureRateThreshold: 50
slowCallRateThreshold: 60
slowCallDurationThreshold: 3s
waitDurationInOpenState: 10s
permittedNumberOfCallsInHalfOpenState: 10
inventoryClient:
slidingWindowSize: 200
failureRateThreshold: 40
waitDurationInOpenState: 5s
retry:
instances:
product-agent:
maxAttempts: 2
waitDuration: 200ms
retryExceptions:
- java.net.SocketTimeoutException
- org.springframework.web.client.ResourceAccessException
ratelimiter:
instances:
inventoryClient:
limitForPeriod: 300
limitRefreshPeriod: 1s
timeoutDuration: 50ms
9.3 流式输出
对用户体验来说,首 Token 延迟比总耗时更敏感。长答案应优先使用 SSE 或 WebSocket。
package com.example.agent.api;
import com.example.agent.context.AgentContextService;
import com.example.agent.core.AgentStreamChunk;
import com.example.agent.orchestration.StreamingAgentOrchestrator;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class AgentChatController {
private final AgentContextService contextService;
private final StreamingAgentOrchestrator orchestrator;
public AgentChatController(AgentContextService contextService, StreamingAgentOrchestrator orchestrator) {
this.contextService = contextService;
this.orchestrator = orchestrator;
}
@GetMapping(value = "/api/agent/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<AgentStreamChunk> stream(
@RequestParam String tenantId,
@RequestParam String userId,
@RequestParam String sessionId,
@RequestParam String message
) {
return contextService.getOrCreate(tenantId, sessionId)
.flatMapMany(context -> orchestrator.stream(tenantId, userId, sessionId, message, context))
.onErrorResume(error -> Flux.just(AgentStreamChunk.done("当前服务繁忙,请稍后再试。")));
}
}
9.4 削峰与异步任务
不是所有 Agent 调用都应该同步完成。比如:
- 生成复杂售后方案。
- 分析长文档。
- 创建人工工单。
- 发送补偿优惠券。
- 进行批量订单诊断。
这些任务应通过 Kafka 异步化:
package com.example.agent.event;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class AgentEventPublisher {
private final KafkaTemplate<String, AgentDomainEvent> kafkaTemplate;
public AgentEventPublisher(KafkaTemplate<String, AgentDomainEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publish(AgentDomainEvent event) {
kafkaTemplate.send("agent-domain-events", event.sessionId(), event);
}
}
public record AgentDomainEvent(
String eventId,
String tenantId,
String sessionId,
String type,
String payload
) {
}
十、从单体 Agent 到分布式 Agent:A2A + Nacos
当 Agent 数量变多、团队边界变清晰后,建议将 Agent 服务化:
orchestrator-service
product-agent-service
order-agent-service
after-sale-agent-service
risk-agent-service
10.1 为什么需要 A2A
A2A 的价值类似"智能体世界里的服务协议":
- Agent 可以通过 AgentCard 暴露名称、描述、能力、输入输出模式、URL、版本。
- 调用方可以基于 AgentCard 发现远程 Agent。
- 不同语言、不同团队实现的 Agent 可以互操作。
- Nacos 这类注册中心可以承担 AgentCard 注册、发现和版本管理。
10.2 Nacos A2A Registry 配置示例
spring:
application:
name: product-agent-service
ai:
alibaba:
a2a:
nacos:
server-addr: ${NACOS_ADDR:127.0.0.1:8848}
username: ${NACOS_USERNAME:nacos}
password: ${NACOS_PASSWORD:nacos}
registry:
enabled: true
discovery:
enabled: true
server:
version: 1.0.0
card:
name: product-agent
description: 商品咨询、库存查询、相似商品推荐智能体
provider:
name: ecommerce-ai-platform
organization: mall-ai-team
生产建议:
server.card.name 要与 Agent Bean 名称或注册名称保持一致。
- 每个 Agent 服务独立部署,独立配置版本。
- 多环境通过 Nacos namespace 隔离。
- 灰度发布通过版本、标签或网关权重控制。
10.3 远程 Agent 适配器
Orchestrator 不应该感知远程调用细节,可以通过适配器把 A2A Remote Agent 包装成统一 Agent 接口。
package com.example.agent.remote;
import com.example.agent.core.Agent;
import com.example.agent.core.AgentDescriptor;
import com.example.agent.core.AgentRequest;
import com.example.agent.core.AgentResult;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
@Component
public class RemoteProductAgentAdapter implements Agent {
private final A2aAgentClient client;
public RemoteProductAgentAdapter(A2aAgentClient client) {
this.client = client;
}
@Override
public AgentDescriptor descriptor() {
return new AgentDescriptor(
"product-agent",
"1.0.0",
"Remote product agent via A2A",
Set.of("product_inventory", "product_recommendation"),
Set.of(),
Duration.ofSeconds(6),
500,
true,
com.example.agent.core.AgentStatus.UP
);
}
@Override
public AgentResult execute(AgentRequest request) {
RemoteAgentResponse response = client.send("product-agent", request.input(), request.context().threadId());
return new AgentResult(
descriptor().name(),
response.content(),
response.confidence(),
response.requiresHuman(),
Map.of("remote", true)
);
}
}
public interface A2aAgentClient {
RemoteAgentResponse send(String agentName, String input, String threadId);
}
public record RemoteAgentResponse(String content, double confidence, boolean requiresHuman) {
}
十一、Kubernetes 部署与弹性伸缩
11.1 Agent 独立部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: product-agent
spec:
replicas: 3
selector:
matchLabels:
app: product-agent
template:
metadata:
labels:
app: product-agent
spec:
containers:
- name: product-agent
image: registry.example.com/ai/product-agent:1.0.0
ports:
- containerPort: 8080
env:
- name: NACOS_ADDR
value: nacos.default.svc.cluster.local:8848
- name: SPRING_DATA_REDIS_HOST
value: redis-master.default.svc.cluster.local
- name: JAVA_TOOL_OPTIONS
value: "-XX:MaxRAMPercentage=75 -XX:+UseG1GC"
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2"
memory: "2Gi"
readinessProbe:
httpGet:
path: /actuator/health/readiness
port: 8080
initialDelaySeconds: 15
periodSeconds: 5
livenessProbe:
httpGet:
path: /actuator/health/liveness
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
11.2 HPA
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: product-agent-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: product-agent
minReplicas: 3
maxReplicas: 30
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 65
- type: Pods
pods:
metric:
name: agent_inflight_requests
target:
type: AverageValue
averageValue: "80"
生产中建议不要只按 CPU 扩容。AI Agent 往往瓶颈在外部 LLM API、工具服务、向量库或连接池,应该增加自定义指标:
agent_inflight_requests
agent_request_latency_p95
llm_provider_rate_limit_errors
tool_call_latency_p95
redis_context_latency_p95
十二、安全、风控与人机协同
12.1 高风险操作不要直接执行
以下操作应进入 Risk Agent 或 Human-in-the-loop:
- 取消订单。
- 发起退款。
- 修改地址。
- 发放优惠券。
- 查询敏感个人信息。
- 执行运维变更。
推荐模式:
12.2 操作型工具必须幂等
package com.example.agent.skill.order;
import org.springframework.ai.tool.annotation.Tool;
import org.springframework.ai.tool.annotation.ToolParam;
import org.springframework.stereotype.Component;
@Component
public class OrderActionTools {
private final OrderCommandService orderCommandService;
public OrderActionTools(OrderCommandService orderCommandService) {
this.orderCommandService = orderCommandService;
}
@Tool(name = "cancel_order", description = "取消未支付订单。执行前必须已经获得用户明确确认。")
public CancelOrderResult cancelOrder(
@ToolParam(description = "订单号") String orderNo,
@ToolParam(description = "幂等键,由 requestId + orderNo 生成") String idempotencyKey,
@ToolParam(description = "用户确认文本") String confirmationText
) {
if (!confirmationText.contains("确认取消")) {
return new CancelOrderResult(orderNo, false, "缺少用户明确确认,未执行取消");
}
return orderCommandService.cancel(orderNo, idempotencyKey);
}
}
public interface OrderCommandService {
CancelOrderResult cancel(String orderNo, String idempotencyKey);
}
public record CancelOrderResult(String orderNo, boolean success, String message) {
}
十三、可观测性与成本治理
没有可观测性的 Agent 系统,线上问题会非常难排查。至少要打通四类指标。
13.1 链路追踪
每次请求都要贯穿:
traceId -> sessionId -> orchestrationId -> agentName -> toolName -> llmRequestId
13.2 指标体系
| 指标 |
维度 |
作用 |
| agent.request.count |
agent、tenant、intent |
流量分析 |
| agent.request.latency |
agent、p50/p95/p99 |
性能瓶颈 |
| agent.tool.error |
tool、exception |
工具稳定性 |
| agent.llm.tokens |
model、agent、tenant |
成本核算 |
| agent.route.confidence |
strategy、intent |
路由质量 |
| agent.handoff.count |
tenant、intent |
人工兜底比例 |
13.3 Token 预算控制
package com.example.agent.cost;
import org.springframework.stereotype.Component;
@Component
public class TokenBudgetGuard {
private final TenantBudgetService budgetService;
public TokenBudgetGuard(TenantBudgetService budgetService) {
this.budgetService = budgetService;
}
public void check(String tenantId, String agentName, int estimatedTokens) {
TokenBudget budget = budgetService.currentBudget(tenantId);
if (budget.remainingTokens() < estimatedTokens) {
throw new TokenBudgetExceededException("Tenant token budget exceeded: " + tenantId);
}
}
}
public interface TenantBudgetService {
TokenBudget currentBudget(String tenantId);
}
public record TokenBudget(long remainingTokens) {
}
class TokenBudgetExceededException extends RuntimeException {
TokenBudgetExceededException(String message) {
super(message);
}
}
预算不足时的降级策略:
- 优先使用规则回复。
- 使用小模型做意图识别。
- 禁用非必要推荐 Agent。
- 缩短上下文窗口。
- 引导人工客服。
十四、常见线上问题与解决方案
14.1 Token 消耗突然暴涨
常见原因:
- 新增工具后所有 Agent 默认加载全部工具。
- 历史上下文没有压缩。
- RAG 召回片段过多。
- 模型路由失败导致多 Agent 重复调用。
解决方案:
- 工具按 Agent 和意图动态筛选。
- 启用工具搜索模式,按需展开工具定义。
- 设置单请求 Token 上限。
- 对 RAG 片段做 rerank 和长度裁剪。
- 按 Agent、租户、模型记录 token 指标。
14.2 Agent 间相互调用导致循环
解决方案:
- AgentRequest 中记录 visitedAgents。
- 限制最大调用深度,例如不超过 3 层。
- Orchestrator 统一调度,禁止业务 Agent 私自无限递归调用。
- 对循环调用做 trace 告警。
14.3 LLM 选择了错误工具
解决方案:
- 工具名称要明确,避免
query、search 这类模糊命名。
- 工具描述要写清楚适用场景和禁止场景。
- 对高风险工具增加规则前置判断。
- 使用工具参数增强记录模型调用理由。
- 对工具调用结果进行业务校验。
14.4 流式接口中途失败
解决方案:
- SSE 事件增加
event: error 和 event: done。
- 前端支持断线重试和会话恢复。
- 后端按 chunk 持久化关键状态。
- 对最终答案生成失败的场景返回事实型兜底答案。
14.5 Redis 上下文热点
解决方案:
- session key 加租户前缀并合理分片。
- 大对象拆分存储,摘要和最近消息分离。
- 热点会话本地短 TTL 缓存,但关键状态仍以 Redis 为准。
- 避免每个 token chunk 都写 Redis,改为阶段性 checkpoint。
十五、推荐落地路径
阶段一:单应用内模块化
适合 0 到 1 验证:
- 一个 Spring Boot 应用。
- 多个本地 Agent Bean。
- 每个 Agent 绑定有限工具。
- Redis 保存上下文。
- Micrometer 记录基本指标。
阶段二:核心 Agent 服务化
适合业务量开始增长:
- Orchestrator 独立服务。
- 高频 Agent 独立部署。
- Nacos 注册发现。
- Kafka 处理异步任务。
- K8s 独立扩容。
阶段三:企业级 Agent 平台
适合多团队、多业务线:
- A2A 统一 Agent 互操作。
- MCP 标准化外部工具接入。
- Agent 评估集与回归测试。
- Prompt、工具、模型版本统一治理。
- 成本预算与审计平台化。
十六、上线检查清单
上线前至少确认:
- 每个 Agent 有明确职责、版本、owner 和能力标签。
- 每个工具有权限控制、超时、限流、熔断和审计。
- 写操作工具具备幂等键和人工确认机制。
- 上下文有租户隔离、压缩策略和敏感信息脱敏。
- Orchestrator 有全链路 deadline 和降级策略。
- LLM 输出解析失败时有兜底。
- Agent 调用链有 traceId,能定位到具体工具和模型请求。
- token 成本按租户、Agent、模型可统计。
- 关键 Agent 有评估集和灰度发布机制。
- K8s 探针、HPA、限流和告警已配置。
十七、总结
把大模型当成"万能工具人",本质上是在用 Prompt 承担架构职责。这个思路可以做 Demo,却很难支撑生产。
生产级 Multi-Agent 的核心不是让模型更"聪明",而是让系统更"可控":
- 用 Agent 拆业务边界,降低单点认知负载。
- 用 Skills/Tools 承载能力边界,并加入治理能力。
- 用 Orchestrator 管理路由、并发、超时、降级和结果聚合。
- 用 Redis、Kafka、Nacos、Kubernetes 把智能体放进成熟的分布式工程体系。
- 用可观测性、评估集和成本预算,让 AI 系统可以持续运营。
Spring AI 的价值在于把 AI 能力带回 Java 工程体系;Spring AI Alibaba、A2A、MCP、Nacos 则进一步把 Agent 从单应用组件推向分布式协作单元。对于 Java 团队来说,Multi-Agent 的正确打开方式不是重新发明一套 AI 平台,而是把 Agent 当成新型微服务,用成熟的软件工程方法治理它。
参考资料
- Spring AI Tool Calling Reference
- Spring AI Dynamic Tool Discovery
- Spring AI Alibaba A2A Agent 文档
- Nacos A2A Registry 文档
- Spring AI A2A Integration Blog