找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

4623

积分

0

好友

635

主题
发表于 昨天 04:26 | 查看: 25| 回复: 0

当业务需求从简单的“问答”升级为复杂的“方案生成、任务拆解、跨角色协同与执行闭环”时,单一智能体的能力边界很快会暴露出来。原因其实很直接:

  • 单智能体擅长基于统一上下文推理,但很难同时兼顾多个专业角色的思维模式。
  • 复杂场景下上下文会急剧膨胀,导致提示词污染、角色混叠、输出不稳定等问题被放大。
  • 业务系统真正需要的往往不是“一段精彩的回答”,而是“可落地、可审计、可扩展、可治理”的最终成果。

因此,多智能体系统的核心价值,并不在于“让多个模型一起说话”,而在于将复杂的业务任务拆解成一个个可治理的工程单元,让不同角色的智能体以受控、协同的方式共同完成目标。本文将从企业架构和生产落地的视角,完整阐述如何基于 Spring AI Alibaba 构建一个用于“活动/营销方案策划”的多智能体系统,并重点解答以下四个关键的工程问题:

  • 为什么多智能体比单智能体更适合复杂的策划类任务?
  • 如何设计一个具备高并发、可扩展、可观测、可容错的多智能体架构?
  • Spring AI Alibaba 在生产环境中,应如何组织代码、配置模型并治理调用链路?
  • 如何将样例代码升级为真正可在线上稳定运行的生产级实现?

本文相关的讨论与代码实践,你可以在 云栈社区 找到更多同行的分享。

业务场景与问题拆解

真实业务背景

以一家大型营销服务公司为例,客户提交一次活动策划需求后,通常需要经历以下几个专业环节:

  • 创意策划:输出活动主题、核心亮点、互动玩法、传播爆点。
  • 财务规划:输出预算结构、成本测算、盈亏平衡点。
  • 执行设计:输出时间排期、资源配置、现场执行流程。
  • 风险控制:输出合规风险、舆情风险、供应链风险清单。
  • 方案汇总:将以上所有部分整合为一份可交付、可评审、可执行的正式方案。

传统模式下,这些环节通常由多个岗位的员工串行协作,导致一系列常见问题:

  • 周期长:专业角色必须串行作业,整体交付速度慢。
  • 协同成本高:需要多轮会议沟通对齐,信息损耗严重。
  • 质量不稳定:输出高度依赖个人经验,缺乏统一标准。
  • 扩展性差:面对业务高峰时,很难快速增加人手来扩容。
  • 知识沉淀弱:宝贵经验分散在不同的人和文档中,难以固化为可复用的系统能力。

为什么必须采用多智能体

“生成一份完整的策划方案”这类任务,本质上不是一个问答问题,而是一个复合型认知工作流,它具有以下鲜明特征:

  • 角色异构:创意人员、财务专家、执行经理的关注点和知识体系完全不同。
  • 目标存在潜在冲突:创意追求新颖和影响力,财务追求成本可控和利润率,执行追求确定性和可落地性。
  • 结果需要深度整合:最终输出必须是一份统一口径、逻辑自洽的方案,而不是几份答案的简单并列。
  • 过程需要严格审计:必须能清晰追溯每个子结论的来源、生成者、生成时间和决策依据。

这意味着系统设计绝不能只是“并发调用几个不同的 Prompt”。一个健壮的系统应当具备:

  • 任务拆解与规划能力
  • 角色隔离与专注能力
  • 结果整合与冲突消解能力
  • 失败隔离与恢复能力
  • 成本感知与控制能力
  • 全面的可观测与审计能力

多智能体系统的核心原理

多智能体不只是“多个 Bot”

多智能体系统的本质,是将一个复杂任务映射成一个受控的协同网络。它通常包含四类核心对象:

  • Planner(规划者):负责拆解总任务,并确定最佳的执行路径和参与角色。
  • Specialist Agent(专业智能体):每个智能体扮演一个特定专业角色,在限定的职责范围内产出高质量结果。
  • Orchestrator(编排器):负责任务调度、并发控制、超时治理、状态流转与上下文管理。
  • Synthesizer/Judge(整合者/评审者):负责聚合各子结果、消解潜在冲突、进行质量校验,并输出最终方案。

从架构视角看,一个成熟的多智能体系统更像“一个由 AI 驱动的分布式业务流程引擎”,而非简单的聊天机器人组合。

多智能体协作的三种常见模式

模式一:并行分工
适合子任务相互独立、无需频繁交互的场景,例如同时生成创意方案、财务测算和执行流程。

  • 优点:延迟低、易于扩展、结构清晰。
  • 缺点:子结果之间可能存在矛盾,需要后期整合。

模式二:串行增强
一个智能体的输出作为下一个智能体的输入,形成处理管道。例如,先生成初版创意,再由执行智能体评估落地性,最后由财务智能体测算预算。

  • 优点:结果一致性高,下游智能体可以基于上游结果进行优化。
  • 缺点:总时延长,且上游的错误或偏差会向下游传播。

模式三:规划 + 执行 + 裁决
先由 Planner 拆解任务,然后多个 Specialist Agent 并行或串行执行,最后由 Judge 进行评分、裁决或整合。这是企业级应用中最常用的模式,因为它兼顾了灵活性、可治理性、可观测性和质量控制。

本文所设计的方案策划系统,采用了一种 “规划弱化版 + 并行执行 + 统一整合” 的混合模式。在企业多数场景中,任务类型相对稳定,引入一个完全自治、决策复杂的 Planner 反而会带来额外的不确定性和成本。因此,我们通过配置化的方式预定义角色和协作流程。

整体架构设计

分层设计

从工程落地角度,建议将系统划分为 5 个层次:

1. 接入层
负责:

  • HTTP / OpenAPI 接入
  • 鉴权与身份验证
  • 请求幂等性保障
  • 流量限制与削峰
  • 灰度发布控制

2. 编排层
负责:

  • 任务拆解与上下文建立
  • 智能体选择与调度
  • 并发执行控制
  • 超时、重试与降级策略
  • 状态机流转与管理

3. 智能体层
负责:

  • 角色提示词(Prompt)的管理与渲染
  • 调用上下文的构造
  • 外部工具(如计算器、搜索)的调用
  • 模型输出的结构化处理
  • 领域特定规则的约束与检查

4. 领域服务层
负责:

  • 预算规则与成本模型
  • 活动模板库管理
  • 风险规则库
  • 审批流程约束
  • 业务知识库的检索与注入

5. 基础设施层
负责:

  • 大模型服务的调用与适配
  • 缓存(如 Redis)服务
  • 消息队列(如 Kafka)服务
  • 数据持久化(如 MySQL)
  • 监控告警体系
  • 配置中心

为什么要引入 Orchestrator

很多演示代码会把“并发调用多个 Agent”的逻辑直接写在 Web Controller 里,这在生产环境中是致命的,很快就会导致代码失控、难以维护和治理。

Orchestrator 的核心价值,在于将 AI 编排逻辑从业务接口中彻底剥离出来,使其成为一个独立的、可治理的组件。它至少应承担以下职责:

  • 建立统一的任务上下文,例如 requestIdtenantIdscenebudgetLimit
  • 根据业务场景动态选择需要参与的智能体集合。
  • 严格控制并发度,避免线程池或下游模型服务被瞬间打爆。
  • 对每个智能体的调用应用独立的超时、重试、熔断和隔离策略。
  • 聚合所有子任务的结果,并完整记录执行轨迹。
  • 输出统一、结构化的中间结果,供后续的整合器消费。

关键技术选型

技术栈建议

类别 技术 用途 选型理由
应用框架 Spring Boot 3.x Web 与服务框架 企业生态成熟,易于治理和集成
AI 集成 Spring AI Alibaba 模型接入、Prompt 编排 与 Spring 体系无缝集成,抽象统一
模型服务 通义千问或兼容 API 的模型 大模型推理 对中文场景支持好,易于国内企业接入
缓存 Redis 请求缓存、幂等控制、分布式锁 高性能,场景匹配度高
消息队列 Kafka / RocketMQ 异步任务、削峰填谷 实现解耦,支持高吞吐
持久化 MySQL / PostgreSQL 任务记录、审计日志存储 保证强一致性,便于后续分析
限流熔断 Resilience4j / Sentinel 容错治理 适合生产级高并发场景
监控 Micrometer + Prometheus + Grafana 指标监控与可视化 Spring 生态标准方案
链路追踪 OpenTelemetry 调用链与耗时分析 便于诊断智能体执行路径和瓶颈

为什么 Spring AI Alibaba 适合这类系统

相比手动编写 HTTP 客户端调用模型 API,Spring AI Alibaba 带来的价值是显而易见的:

  • 模型调用抽象统一:通过一致的编程接口对接不同模型提供商,便于未来切换或做灾备。
  • 自然的 AI 元素组织:可以更优雅地组织 Prompt、Message、Options 等 AI 编程核心元素。
  • 与 Spring 生态深度集成:能无缝对接 Spring Boot 的配置体系、Bean 生命周期、以及可观测体系(Actuator, Micrometer)。
  • 沉淀标准范式:有助于在企业内部形成标准化的 AI 应用开发模式和最佳实践。

但需要明确一点:Spring AI Alibaba 主要解决的是“模型接入和基础的 AI 编程抽象”问题,它并不直接提供“多智能体生产级编排”的能力。 编排、治理、幂等、容错、审计等生产级特性,仍然需要我们在应用层基于其能力进行设计和实现。

生产级领域建模

如果系统仅仅返回一段自然语言文本,将很难支撑后续的审计、回放、重试、质量分析等需求。因此,强烈建议从项目伊始就进行结构化领域建模。

核心对象

public enum AgentRole {
    CREATIVE,
    FINANCE,
    EXECUTION,
    RISK,
    JUDGE
}
public enum TaskStatus {
    PENDING,
    RUNNING,
    SUCCESS,
    PARTIAL_SUCCESS,
    FAILED,
    TIMEOUT
}
import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;
import java.util.Map;

public record PlanningRequest(
        String requestId,
        String tenantId,
        String scene,
        String customerName,
        String requirement,
        BigDecimal budgetUpperLimit,
        Instant deadline,
        List<String> constraints,
        Map<String, Object> ext
) {
}
import java.time.Instant;

public record AgentTask(
        String taskId,
        String requestId,
        AgentRole role,
        String prompt,
        Instant createdAt
) {
}
import java.time.Instant;
import java.util.Map;

public record AgentResult(
        String taskId,
        AgentRole role,
        boolean success,
        String rawOutput,
        Map<String, Object> structuredOutput,
        String errorCode,
        String errorMessage,
        long latencyMs,
        Instant finishedAt
) {
}
import java.time.Instant;
import java.util.List;

public record PlanningResponse(
        String requestId,
        TaskStatus status,
        String finalPlan,
        List<AgentResult> agentResults,
        long totalLatencyMs,
        Instant finishedAt
) {
}

为什么必须结构化输出

强制要求智能体输出结构化数据,至少能解决四个现实的生产问题:

  1. 便于整合:整合器可以直接读取关键字段(如 totalBudget),无需再从自然语言中进行复杂的二次解析。
  2. 便于展示:前端可以轻松地分区、分模块展示不同智能体的贡献结果,用户体验更佳。
  3. 便于评估:方便离线进行方案质量评估、A/B 测试和效果回归分析。
  4. 便于补偿:在部分智能体失败的场景下,可以针对性地对特定结构化字段进行重试或补偿计算。

例如,财务智能体不应只返回一段“预算说明”文字,而应该输出结构化的:

  • totalBudget(总预算)
  • fixedCost(固定成本)
  • variableCost(可变成本)
  • riskReserve(风险预留金)
  • overBudgetItems(成本超限项列表)

生产级代码实现

以下实现的目标不是“最短可运行”,而是“接近企业级项目的真实写法”。

Agent 抽象层

import java.time.Duration;
import java.util.Map;

public interface AgentExecutor {

    AgentRole role();

    Duration timeout();

    AgentResult execute(PlanningRequest request, AgentTask task, Map<String, Object> sharedContext);
}
import org.springframework.ai.chat.messages.SystemMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.chat.prompt.PromptTemplate;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.client.ChatClient;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

public abstract class AbstractPlanningAgent implements AgentExecutor {

    protected final ChatClient chatClient;

    protected AbstractPlanningAgent(ChatClient chatClient) {
        this.chatClient = chatClient;
    }

    protected abstract String systemPrompt();

    protected abstract String userPromptTemplate();

    protected abstract Map<String, Object> buildVariables(PlanningRequest request, Map<String, Object> sharedContext);

    @Override
    public AgentResult execute(PlanningRequest request, AgentTask task, Map<String, Object> sharedContext) {
        Instant start = Instant.now();
        try {
            Map<String, Object> variables = new HashMap<>(buildVariables(request, sharedContext));
            PromptTemplate template = new PromptTemplate(userPromptTemplate(), variables);
            String renderedUserPrompt = template.render();

            ChatResponse response = chatClient.prompt(
                    new Prompt(
                            new SystemMessage(systemPrompt()),
                            new UserMessage(renderedUserPrompt)
                    )
            ).call().chatResponse();

            String content = response.getResult().getOutput().getText();
            Map<String, Object> structured = Map.of("content", content);

            return new AgentResult(
                    task.taskId(),
                    role(),
                    true,
                    content,
                    structured,
                    null,
                    null,
                    Duration.between(start, Instant.now()).toMillis(),
                    Instant.now()
            );
        } catch (Exception ex) {
            return new AgentResult(
                    task.taskId(),
                    role(),
                    false,
                    null,
                    Map.of(),
                    "AGENT_EXECUTION_ERROR",
                    ex.getMessage(),
                    Duration.between(start, Instant.now()).toMillis(),
                    Instant.now()
            );
        }
    }
}

专业智能体实现

创意 Agent

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;

@Component
public class CreativeAgent extends AbstractPlanningAgent {

    public CreativeAgent(ChatClient chatClient) {
        super(chatClient);
    }

    @Override
    public AgentRole role() {
        return AgentRole.CREATIVE;
    }

    @Override
    public Duration timeout() {
        return Duration.ofSeconds(12);
    }

    @Override
    protected String systemPrompt() {
        return """
                你是一名资深创意策划总监。
                你的职责是围绕业务目标输出可执行、可传播、具有记忆点的活动创意方案。
                输出时必须兼顾品牌调性、目标受众、传播节奏与落地可行性。
                不要讨论预算细节,不要越权做财务承诺。
                """;
    }

    @Override
    protected String userPromptTemplate() {
        return """
                请基于以下输入生成创意策划建议:
                - 场景:{scene}
                - 客户:{customerName}
                - 需求:{requirement}
                - 预算上限:{budgetUpperLimit}
                - 约束:{constraints}

                请输出:
                1. 活动主题
                2. 核心创意亮点
                3. 用户参与机制
                4. 传播建议
                5. 对执行和预算可能产生影响的注意事项
                """;
    }

    @Override
    protected Map<String, Object> buildVariables(PlanningRequest request, Map<String, Object> sharedContext) {
        return Map.of(
                "scene", request.scene(),
                "customerName", request.customerName(),
                "requirement", request.requirement(),
                "budgetUpperLimit", request.budgetUpperLimit(),
                "constraints", request.constraints()
        );
    }
}

财务 Agent

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;

@Component
public class FinanceAgent extends AbstractPlanningAgent {

    public FinanceAgent(ChatClient chatClient) {
        super(chatClient);
    }

    @Override
    public AgentRole role() {
        return AgentRole.FINANCE;
    }

    @Override
    public Duration timeout() {
        return Duration.ofSeconds(10);
    }

    @Override
    protected String systemPrompt() {
        return """
                你是一名企业活动预算规划专家。
                你的职责是输出成本结构、预算边界、风险预留和降本建议。
                你的目标是保证方案可交付且成本可控。
                不要发散讨论创意,不要输出空泛建议。
                """;
    }

    @Override
    protected String userPromptTemplate() {
        return """
                请基于以下输入生成预算方案:
                - 需求:{requirement}
                - 预算上限:{budgetUpperLimit}
                - 约束:{constraints}
                - 创意摘要:{creativeSummary}

                请输出:
                1. 预算分项表
                2. 高风险成本项
                3. 成本压缩建议
                4. 推荐预算区间
                5. 是否存在超预算风险
                """;
    }

    @Override
    protected Map<String, Object> buildVariables(PlanningRequest request, Map<String, Object> sharedContext) {
        return Map.of(
                "requirement", request.requirement(),
                "budgetUpperLimit", request.budgetUpperLimit(),
                "constraints", request.constraints(),
                "creativeSummary", sharedContext.getOrDefault("creativeSummary", "暂无")
        );
    }
}

执行 Agent

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;

@Component
public class ExecutionAgent extends AbstractPlanningAgent {

    public ExecutionAgent(ChatClient chatClient) {
        super(chatClient);
    }

    @Override
    public AgentRole role() {
        return AgentRole.EXECUTION;
    }

    @Override
    public Duration timeout() {
        return Duration.ofSeconds(10);
    }

    @Override
    protected String systemPrompt() {
        return """
                你是一名大型活动执行负责人。
                请从时间排期、人员分工、供应商协同、现场流程、应急预案等角度输出执行方案。
                输出必须强调落地性和可操作性。
                """;
    }

    @Override
    protected String userPromptTemplate() {
        return """
                请为以下活动需求输出执行方案:
                - 场景:{scene}
                - 需求:{requirement}
                - 截止时间:{deadline}
                - 创意摘要:{creativeSummary}
                - 约束:{constraints}

                请输出:
                1. 里程碑排期
                2. 人员与角色配置
                3. 供应商协作建议
                4. 执行风险点
                5. 应急处置方案
                """;
    }

    @Override
    protected Map<String, Object> buildVariables(PlanningRequest request, Map<String, Object> sharedContext) {
        return Map.of(
                "scene", request.scene(),
                "requirement", request.requirement(),
                "deadline", request.deadline(),
                "creativeSummary", sharedContext.getOrDefault("creativeSummary", "暂无"),
                "constraints", request.constraints()
        );
    }
}

编排器实现:高并发与容错核心

系统的真正关键不在于单个智能体,而在于编排器。下面给出一个生产导向的 AgentOrchestrator 示例,它实现了:

  • 并发执行
  • 单智能体超时控制
  • 失败隔离与异常处理
  • 部分成功结果返回
  • 智能体间的共享上下文注入
import org.springframework.stereotype.Service;

import java.time.Instant;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Service
public class AgentOrchestrator {

    private final Map<AgentRole, AgentExecutor> agentRegistry;
    private final Executor agentExecutorPool;
    private final ResultIntegrator resultIntegrator;

    public AgentOrchestrator(List<AgentExecutor> executors,
                             Executor agentExecutorPool,
                             ResultIntegrator resultIntegrator) {
        this.agentRegistry = executors.stream()
                .collect(Collectors.toMap(AgentExecutor::role, it -> it, (a, b) -> a, () -> new EnumMap<>(AgentRole.class)));
        this.agentExecutorPool = agentExecutorPool;
        this.resultIntegrator = resultIntegrator;
    }

    public PlanningResponse plan(PlanningRequest request) {
        Instant start = Instant.now();

        Map<String, Object> sharedContext = new java.util.concurrent.ConcurrentHashMap<>();
        List<AgentRole> roles = List.of(AgentRole.CREATIVE, AgentRole.FINANCE, AgentRole.EXECUTION, AgentRole.RISK);
        List<CompletableFuture<AgentResult>> futures = new ArrayList<>();

        for (AgentRole role : roles) {
            AgentExecutor executor = agentRegistry.get(role);
            AgentTask task = new AgentTask(
                    UUID.randomUUID().toString(),
                    request.requestId(),
                    role,
                    "generated-by-orchestrator",
                    Instant.now()
            );

            CompletableFuture<AgentResult> future = CompletableFuture
                    .supplyAsync(() -> executor.execute(request, task, sharedContext), agentExecutorPool)
                    .orTimeout(executor.timeout().toMillis(), TimeUnit.MILLISECONDS)
                    .exceptionally(ex -> new AgentResult(
                            task.taskId(),
                            role,
                            false,
                            null,
                            Map.of(),
                            "TIMEOUT_OR_FAILURE",
                            ex.getMessage(),
                            executor.timeout().toMillis(),
                            Instant.now()
                    ));

            future = future.thenApply(result -> {
                if (result.success() && role == AgentRole.CREATIVE) {
                    sharedContext.put("creativeSummary", result.rawOutput());
                }
                return result;
            });

            futures.add(future);
        }

        List<AgentResult> results = futures.stream().map(CompletableFuture::join).toList();
        String finalPlan = resultIntegrator.integrate(request, results);

        boolean allSuccess = results.stream().allMatch(AgentResult::success);
        boolean anySuccess = results.stream().anyMatch(AgentResult::success);

        TaskStatus status = allSuccess ? TaskStatus.SUCCESS :
                anySuccess ? TaskStatus.PARTIAL_SUCCESS : TaskStatus.FAILED;

        return new PlanningResponse(
                request.requestId(),
                status,
                finalPlan,
                results,
                java.time.Duration.between(start, Instant.now()).toMillis(),
                Instant.now()
        );
    }
}

结果整合器:从“拼接文本”升级到“冲突消解”

很多文章中的“整合器”只是把多个结果拼在一起,再扔给大模型重新总结一遍。这种做法的问题是:

  • 无法识别“创意方案”与“预算限制”之间的具体冲突。
  • 无法判断“执行计划”是否真的可落地。
  • 无法确认“风险项”是否在最终方案中被有效覆盖。

生产级的整合器至少应该做两层处理:

第一层:规则整合

  • 检查预算是否超过客户设定的上限。
  • 核对执行排期是否晚于最终截止时间。
  • 识别并标记被风控智能体标出的高风险项。

第二层:模型整合

  • 在满足第一层规则约束的前提下,生成一份完整、流畅、专业的最终方案。
  • 显式输出“冲突项与决策理由”,例如“创意中的XX环节因预算超限已调整为YY方案”。
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.messages.SystemMessage;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Collectors;

@Component
public class ResultIntegrator {

    private final ChatClient chatClient;

    public ResultIntegrator(ChatClient chatClient) {
        this.chatClient = chatClient;
    }

    public String integrate(PlanningRequest request, List<AgentResult> results) {
        String agentOutputs = results.stream()
                .map(it -> "## " + it.role() + "\n"
                        + "success=" + it.success() + "\n"
                        + "latencyMs=" + it.latencyMs() + "\n"
                        + "content=" + (it.rawOutput() == null ? "N/A" : it.rawOutput()))
                .collect(Collectors.joining("\n\n"));

        String prompt = """
                你是一名资深方案总监,请将多角色智能体结果整合为一份正式交付方案。
                整合原则:
                1. 优先保证可执行性和预算可控
                2. 如果创意与预算冲突,保留创意方向并给出降配方案
                3. 如果执行与时间冲突,必须显式指出并调整排期
                4. 输出需包含:背景目标、总体方案、预算建议、执行计划、风险控制、结论

                原始需求:
                %s

                各角色输出:
                %s
                """.formatted(request.requirement(), agentOutputs);

        return chatClient.prompt(
                new Prompt(
                        new SystemMessage("你是一名严谨的企业方案整合专家。"),
                        new UserMessage(prompt)
                )
        ).call().content();
    }
}

风险 Agent 示例

import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;

@Component
public class RiskAgent extends AbstractPlanningAgent {

    public RiskAgent(ChatClient chatClient) {
        super(chatClient);
    }

    @Override
    public AgentRole role() {
        return AgentRole.RISK;
    }

    @Override
    public Duration timeout() {
        return Duration.ofSeconds(8);
    }

    @Override
    protected String systemPrompt() {
        return """
                你是一名企业级风控顾问。
                请识别活动在合规、舆情、供应链、执行安全上的风险,并给出优先级排序和缓解措施。
                """;
    }

    @Override
    protected String userPromptTemplate() {
        return """
                请基于以下信息输出风险评估:
                - 场景:{scene}
                - 需求:{requirement}
                - 约束:{constraints}

                请输出:
                1. Top5 风险项
                2. 风险等级
                3. 触发条件
                4. 缓解措施
                5. 必须人工复核的部分
                """;
    }

    @Override
    protected Map<String, Object> buildVariables(PlanningRequest request, Map<String, Object> sharedContext) {
        return Map.of(
                "scene", request.scene(),
                "requirement", request.requirement(),
                "constraints", request.constraints()
        );
    }
}

控制器与接口设计

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;

@RestController
@RequestMapping("/api/planning")
public class PlanningController {

    private final AgentOrchestrator orchestrator;

    public PlanningController(AgentOrchestrator orchestrator) {
        this.orchestrator = orchestrator;
    }

    @PostMapping("/generate")
    public ResponseEntity<PlanningResponse> generate(
            @RequestHeader(value = "X-Request-Id", required = false) String requestId,
            @Valid @RequestBody CreatePlanningCommand command) {

        PlanningRequest request = new PlanningRequest(
                requestId == null ? UUID.randomUUID().toString() : requestId,
                command.tenantId(),
                command.scene(),
                command.customerName(),
                command.requirement(),
                command.budgetUpperLimit(),
                command.deadline(),
                command.constraints(),
                Map.of()
        );

        return ResponseEntity.ok(orchestrator.plan(request));
    }

    public record CreatePlanningCommand(
            @NotBlank String tenantId,
            @NotBlank String scene,
            @NotBlank String customerName,
            @NotBlank String requirement,
            @NotNull BigDecimal budgetUpperLimit,
            @NotNull Instant deadline,
            List<String> constraints
    ) {
    }
}

高并发设计与工程化升级

并发瓶颈真正在哪里

多智能体系统的瓶颈通常不只在于 CPU 计算,而主要集中在这几个环节:

  • 模型调用时延:调用大模型 API 通常是毫秒到秒级,是主要的耗时来源。
  • 外部 API QPS 限制:模型服务商通常有严格的速率限制。
  • 上下文组装和序列化开销:构建庞大的 Prompt 并进行网络传输需要时间。
  • 线程池排队与上下文切换:不当的线程池配置会导致任务排队。
  • 网络 IO:与模型服务、数据库、缓存等的网络通信延迟。
  • 结果整合阶段的二次推理:整合器调用大模型进行总结,可能成为新的瓶颈。

因此,优化方向不能只盯着“把线程池调大”,而应该从系统视角治理整条调用链路。

线程池隔离

切勿让所有智能体共用一个无边界线程池。建议至少做到:

  • 编排线程池与 Web 业务线程池分离:避免 AI 任务阻塞常规 HTTP 请求处理。
  • 高低成本智能体分池隔离:将耗时长、成本高的智能体(如复杂创意生成)与轻量级智能体(如风险初筛)隔离,防止前者影响后者。
  • 设置有界队列与合理的拒绝策略:例如 CallerRunsPolicy,在队列满时由调用者线程执行,起到负反馈作用。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class ExecutorConfig {

    @Bean
    public Executor agentExecutorPool() {
        return new ThreadPoolExecutor(
                16,
                32,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(500),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

限流、熔断与重试

生产环境中,最常见的问题不是“代码写错”,而是下游模型服务不稳定、响应时间抖动、触发限流。建议在每个智能体调用外层集成 Resilience4j 等容错库:

  • TimeLimiter:严格限制单次调用的最大时长。
  • CircuitBreaker:当下游服务异常率升高时,快速失败,避免雪崩。
  • Bulkhead:限制单个智能体或一类智能体占用的最大并发数。
  • Retry:仅对幂等请求(如纯查询)进行有限次数(如1-2次)的重试。

一个核心原则是:超时控制应优先于重试。对于耗时长、非幂等的长文本生成任务,盲目重试会显著增加成本和延迟。

缓存与幂等

对于方案策划类任务,缓存不能简单地基于原始需求文本,必须考虑多维度:

  • 租户维度 (tenantId)
  • 业务场景维度 (scene)
  • Prompt 模板版本 (promptVersion)
  • 所选用的大模型版本 (modelVersion)
  • 关键业务约束是否一致

推荐的缓存 Key 格式:

planning:{tenantId}:{scene}:{modelVersion}:{promptVersion}:{requestHash}

幂等性建议:

  • 要求前端或调用方传入 X-Request-Id
  • 服务端首先查询幂等表(可利用 Redis 或数据库)。
  • 若请求已处理完成,则直接返回历史结果。
  • 若请求正在处理中,则返回“任务进行中”状态,并提供查询接口。

异步化与削峰填谷

当请求量剧增后,不建议所有请求都同步等待完整方案生成。推荐两种模式并存:

同步模式
适用于轻量级方案生成、内部工具台、交互式操作等对实时性要求高的场景。

异步模式
适用于大型复杂方案、流量峰值场景、批量任务处理。
异步流程建议:

  1. API 接收请求,生成唯一任务ID并落库,状态置为PENDING
  2. 将任务信息投递到消息队列(如 Kafka)。
  3. 独立的 Worker 服务消费消息,调用 Orchestrator 执行任务。
  4. 将执行结果和状态回写至数据库。
  5. 通过回调 URL、WebSocket 或客户端轮询任务状态接口来获取最终结果。

可观测性与生产治理

指标体系

一个多智能体系统至少要监控以下核心指标:

  • 请求总量成功率部分成功率
  • 平均响应时间P95/P99 响应时间
  • Agent独立成功率平均耗时
  • Agent模型 Token 消耗量(估算成本)。
  • 重试次数超时次数熔断器状态
  • 缓存命中率
  • 单租户调用量预估成本

Micrometer 埋点示例

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class AgentMetrics {

    private final MeterRegistry registry;

    public AgentMetrics(MeterRegistry registry) {
        this.registry = registry;
    }

    public void recordSuccess(AgentRole role, long latencyMs) {
        Timer.builder("agent.execution.latency")
                .tag("role", role.name())
                .register(registry)
                .record(latencyMs, TimeUnit.MILLISECONDS);

        Counter.builder("agent.execution.success")
                .tag("role", role.name())
                .register(registry)
                .increment();
    }

    public void recordFailure(AgentRole role, String errorCode) {
        Counter.builder("agent.execution.failure")
                .tag("role", role.name())
                .tag("errorCode", errorCode == null ? "UNKNOWN" : errorCode)
                .register(registry)
                .increment();
    }
}

链路追踪

建议将以下字段作为 Trace 上下文,贯穿整个调用链路:

  • requestId (请求唯一标识)
  • tenantId (租户标识)
  • scene (业务场景)
  • agentRole (当前执行的智能体角色)
  • taskId (子任务标识)
  • modelName (使用的大模型名称)

这样,当出现问题时,运维和开发人员能够快速定位:

  • 哪个租户的请求失败率最高?
  • 哪个智能体的执行最慢,成了瓶颈?
  • 哪个模型版本的响应时间波动最大?
  • 哪个业务场景最容易触发异常?

审计与回放

企业级应用常常需要回答“为什么最终方案是这样的?”。因此,必须保留完整的审计信息,包括:

  • 原始请求的所有参数。
  • 使用的 Prompt 模板及其版本
  • 调用的 大模型版本
  • 每个 智能体的原始输出和结构化输出
  • 整合器的最终输出
  • 每个步骤的 精确时间戳
  • 触发本次调用的 操作人或系统标识

这些数据将支撑以下关键场景:

  • 合规审计:满足内控和外部审计要求。
  • 线上问题回放:精准复现问题现场,辅助调试。
  • 质量复盘:分析方案质量波动的原因。
  • Prompt/模型升级效果对比:通过 A/B 测试数据评估新版本效果。

部署方案

Dockerfile

FROM eclipse-temurin:17-jre
WORKDIR /app
COPY target/planning-service.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-XX:+UseContainerSupport", "-XX:MaxRAMPercentage=75.0", "-jar", "app.jar"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: planning-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: planning-service
  template:
    metadata:
      labels:
        app: planning-service
    spec:
      containers:
        - name: planning-service
          image: planning-service:1.0.0
          ports:
            - containerPort: 8080
          env:
            - name: SPRING_PROFILES_ACTIVE
              value: prod
            - name: SPRING_AI_ALIBABA_QWEN_API_KEY
              valueFrom:
                secretKeyRef:
                  name: llm-secret
                  key: api-key
          resources:
            requests:
              cpu: "500m"
              memory: "1Gi"
            limits:
              cpu: "2"
              memory: "4Gi"
          readinessProbe:
            httpGet:
              path: /actuator/health/readiness
              port: 8080
          livenessProbe:
            httpGet:
              path: /actuator/health/liveness
              port: 8080

横向扩缩容建议

扩容策略不应只关注 CPU 使用率,建议综合以下信号进行决策:

  • 接口 QPS 是否持续高于阈值。
  • 消息队列(MQ)的积压(backlog) 是否持续增长。
  • 智能体平均耗时 是否因排队而显著增加。
  • 下游模型服务 的限流错误(429)是否增多。

如果仅根据应用实例的 CPU 使用率扩容,常常会出现“应用实例数量上去了,但下游模型服务的 QPS 限制仍然是瓶颈”的假性扩容。

总结

多智能体系统真正的难点,从来不只是“如何并行调用多个模型”,而是如何将不确定的 AI 能力,嵌入到一个确定性的、可靠的工程框架之中。

从实践经验看,一个生产可用的多智能体方案策划系统至少要清晰回答以下问题:

  • 如何拆解任务,确保角色职责清晰、避免混叠?
  • 如何编排执行,在追求高并发的同时保障系统稳定性?
  • 如何实施治理,包括超时、重试、熔断、降级策略?
  • 如何保证结果是可整合、可审计、可回放的?
  • 如何在高并发下,依然保持成本和响应时间在可控范围内?

Spring AI Alibaba 为我们提供了优秀的模型接入与应用开发基础,但真正决定系统高度和稳定性的,仍然是顶层的架构设计、细致的工程治理和丰富的生产运维经验。

如果将本文的核心思想浓缩为一句话,那就是:

多智能体不是“多调用几个大模型”,而是“把复杂的业务流程,重构为一套可治理、可扩展的 AI 协同系统”。

这也是多智能体技术在企业级场景中真正闪耀价值的地方。




上一篇:从备胎到传奇:8086处理器逆袭史话
下一篇:Spring AI 实战:构建高并发可扩展的智能行程规划Agent系统
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-7 18:38 , Processed in 0.842780 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表