找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2799

积分

0

好友

375

主题
发表于 昨天 03:14 | 查看: 5| 回复: 0

在AIGC应用快速落地的这几年,很多团队在做“故事创作”“营销文案”“教育内容生成”“游戏剧情设计”等业务时,最先采用的通常是单次Prompt加单模型调用的实现方式。这种方式适合PoC(概念验证),但一旦进入真实生产环境,很快会暴露出三个问题:

  1. 单次生成链路过长,Prompt越写越复杂,质量不可控。
  2. 高并发场景下,模型调用耗时高、成本高,系统吞吐难以提升。
  3. 业务需求不断扩展后,能力边界不清晰,系统难以维护和演进。

故事创作平台尤其典型。一次高质量的故事生成,通常并不是“一个模型一次输出”就能完成,而是由多个相对独立但又相互关联的任务组成:

  • 题材理解与意图澄清
  • 世界观和角色设定
  • 情节大纲规划
  • 分章节正文生成
  • 语言风格统一
  • 安全审核与质量评估
  • 用户偏好记忆与二次编辑

这类任务天然适合拆分为多个智能体(Agent)协作完成。每个Agent负责单一职责,通过调度器完成编排、状态传递与结果汇总。这样可以把“大模型黑盒问题”转化为“可拆解、可观测、可扩展的工程系统”。

本文将基于 AgentScope Java + Spring Boot + Kafka + Redis + PostgreSQL + Kubernetes 这套企业级后端 & 架构技术栈,系统讲解如何构建一个面向生产环境的高并发故事创作平台。重点不只在“怎么跑起来”,更在于:

  • 多智能体协作的核心原理
  • 高并发与可扩展架构设计
  • 生产级代码组织与治理策略
  • 真实业务场景下的工程落地方法

业务目标与系统约束

业务目标

我们假设平台面向以下场景:

  • C端用户在Web/App中发起“故事创作”请求
  • 支持题材、风格、目标读者、篇幅、角色设定等多维输入
  • 支持秒级返回任务受理结果,异步查看创作进度
  • 支持章节化生成、续写、重写、局部润色
  • 支持高峰期每秒数百到数千个创作请求

核心技术约束

相比传统CRUD系统,多智能体故事平台还有几类额外约束:

  • 外部依赖不稳定:LLM API可能超时、限流、偶发失败
  • 生成质量非确定性:同样输入不一定得到完全一致输出
  • 长链路编排:一次请求涉及多个Agent和多个中间状态
  • 成本敏感:Prompt长度、模型选择、重试策略都会影响成本
  • 可观测性要求高:必须知道每一步耗时、失败点、Token消耗

因此,这不是一个“简单调用大模型”的系统,而是一个兼顾业务质量、性能、稳定性和成本的分布式协作系统。

为什么选择 AgentScope Java

AgentScope Java 的价值不在于“再封装一次LLM”,而在于它更适合构建有明确协作边界的多Agent运行时。对于Java技术栈团队而言,它有几个现实优势:

  • 更容易融入现有Spring Boot微服务体系
  • 更适合与Kafka、Redis、数据库、中台能力打通
  • 更符合企业级团队对类型安全、可测试性、工程规范的要求
  • 更方便做线程池治理、连接池治理、链路监控和资源隔离

从工程视角看,AgentScope Java适合作为“智能体运行时与协作框架”,而不是替代整个业务系统。更合理的定位是:

  • Spring Boot负责API、配置、生命周期和工程基础设施
  • AgentScope Java负责Agent定义、消息传递、协作编排
  • Kafka/Redis/DB负责异步解耦、状态存储与缓存
  • Kubernetes负责部署、弹性扩缩容和故障恢复

生产级架构总览

分层设计

可以把系统拆成五层:

  1. 接入层
    包括API Gateway、鉴权、限流、灰度发布、Trace注入。
  2. 应用层
    提供故事创建、续写、重写、查询进度、获取结果等接口。
  3. 编排层
    由Story Orchestrator负责DAG(有向无环图)编排、并发调度、超时控制、失败补偿。
  4. Agent执行层
    不同Agent负责特定能力,如情节、角色、章节、风格、审核。
  5. 基础设施层
    Redis、Kafka、PostgreSQL、对象存储、监控、Kubernetes。

为什么不是“所有Agent都同步调用”

很多初版系统会把Agent编排写成一串同步调用:

Intent -> Plot -> Character -> Chapter -> Style -> Review

这种设计在功能上可行,但在生产上问题很大:

  • 整个接口耗时取决于最长链路,用户体验差
  • 任一步超时都会拖慢整体响应
  • Web线程被长时间占用,吞吐快速下降
  • 失败恢复困难,中间结果无法复用

更合理的方式是:

  • 接口层只做任务受理,快速返回 taskId
  • 编排层异步驱动任务状态机
  • 可并行的Agent尽量并行
  • 中间结果持久化,支持断点续跑
  • 通过事件驱动实现弹性扩展

多智能体协作原理:从链式调用到DAG编排

多智能体系统的本质

多智能体系统并不是“多个模型实例同时跑”,而是将复杂任务分解成一组可治理的子任务,并通过标准协议在Agent之间传递上下文、约束与产出。

在故事创作场景中,推荐使用 DAG编排 而不是单纯串行:
其中:

  • 用户需求解析 必须最先执行
  • 情节规划角色设定 可以并行
  • 章节生成 依赖前两者结果
  • 风格统一质量审核 在后置阶段处理

这类DAG设计的收益非常明显:

  • 减少关键路径耗时
  • 明确依赖关系,便于失败重试
  • 支持节点级扩容和独立优化
  • 更适合做任务级可观测性

上下文管理原则

多智能体协作最大的隐患之一是上下文失控。一个Agent输出过长,会直接导致后续Prompt膨胀、成本升高、质量下降。

实践中要遵守三条原则:

  1. 结构化传递,不传自由文本大杂烩
    Agent之间尽量传JSON结构,而不是整段自然语言。
  2. 只传必要上下文,不传全部历史
    章节生成只需要角色卡、世界观、大纲片段,不需要完整系统日志。
  3. 中间结果持久化,按需加载
    上下文不要总放在内存里,应支持从Redis或数据库按阶段恢复。

核心领域模型设计

故事任务实体

在生产系统里,首先要把“故事创作请求”建模为任务,而不是一次HTTP调用。

public class StoryTask {
    private Long id;
    private String taskId;
    private String userId;
    private String genre;
    private String theme;
    private String audience;
    private Integer targetWords;
    private String status;
    private String currentStage;
    private Integer progress;
    private String resultVersion;
    private Instant createdAt;
    private Instant updatedAt;
}

这里的关键字段有:

  • statusPENDINGRUNNINGPARTIAL_SUCCESSSUCCESSFAILED
  • currentStage:当前运行到哪一个Agent阶段
  • progress:便于前端轮询或推送展示
  • resultVersion:支持重写、续写、多版本编辑

上下文快照模型

为了支持断点续跑和问题排查,建议维护上下文快照:

public class StoryContextSnapshot {
    private String taskId;
    private String stage;
    private String payloadJson;
    private String promptTemplateVersion;
    private String modelName;
    private Integer promptTokens;
    private Integer completionTokens;
    private Long latencyMs;
    private Boolean success;
    private String errorCode;
    private String errorMessage;
    private Instant createdAt;
}

这类快照非常重要,它决定了系统是否真正可运维:

  • 线上质量回溯依赖它
  • Prompt优化依赖它
  • 成本分析依赖它
  • 失败补偿和灰度回滚依赖它

生产级工程架构设计

服务拆分建议

随着规模增长,建议不要把所有逻辑都塞进一个服务。比较合理的拆分如下: 服务 核心职责 是否无状态
story-api-service 对外API、鉴权、任务创建、结果查询
story-orchestrator-service DAG编排、状态机推进、超时控制
story-agent-worker 执行具体Agent任务
llm-gateway-service 模型路由、熔断、限流、降级、审计
story-query-service 查询聚合、报表、运营接口

在早期可以合并部署,但代码层面最好提前隔离职责,这样后续拆服务成本最低。

事件驱动架构

建议使用Kafka将“任务创建”和“阶段推进”事件解耦。

事件主题可设计为:

  • story.task.created
  • story.stage.intent.completed
  • story.stage.plot.completed
  • story.stage.character.completed
  • story.stage.chapter.completed
  • story.stage.review.completed
  • story.task.failed

这种设计的价值在于:

  • 控制面和执行面解耦
  • Worker可以水平扩展
  • 下游失败不会直接拖垮上游接口
  • 支持灵活插入新Agent,例如“敏感内容审核Agent”

状态机设计

复杂任务不建议只靠“if-else”推进。更推荐显式状态机:

状态机好处有三个:

  • 便于限定非法状态流转
  • 便于重试和补偿策略统一
  • 便于监控告警和运营统计

生产级代码实现:从API到编排

下面给出一套更接近真实生产项目的代码组织方式。

API入参和响应对象

public record CreateStoryCommand(
        @NotBlank String userId,
        @NotBlank String genre,
        @NotBlank String theme,
        @NotBlank String audience,
        @Min(500) @Max(20000) Integer targetWords,
        List<String> keywords,
        String preferredStyle,
        Boolean allowStreaming
) {}
public record CreateStoryResponse(
        String taskId,
        String status,
        String message
) {}

这里要注意,API层只负责接收和校验,不直接承载复杂业务逻辑。

Controller:快速受理,避免同步阻塞

@RestController
@RequestMapping("/api/v1/stories")
@RequiredArgsConstructor
public class StoryController {

    private final StoryApplicationService storyApplicationService;

    @PostMapping
    public ResponseEntity<CreateStoryResponse> createStory(
            @Valid @RequestBody CreateStoryCommand command) {
        CreateStoryResponse response = storyApplicationService.createStory(command);
        return ResponseEntity.accepted().body(response);
    }

    @GetMapping("/{taskId}")
    public ResponseEntity<StoryTaskDetailVO> getTaskDetail(@PathVariable String taskId) {
        return ResponseEntity.ok(storyApplicationService.getTaskDetail(taskId));
    }
}

设计重点:

  • 返回 202 Accepted,说明任务已受理但未完成
  • taskId 作为后续查询和推送订阅主键
  • 避免把HTTP请求和长耗时LLM调用绑死

应用服务:创建任务并投递事件

@Service
@RequiredArgsConstructor
public class StoryApplicationService {

    private final StoryTaskRepository storyTaskRepository;
    private final StoryEventPublisher storyEventPublisher;
    private final IdGenerator idGenerator;

    @Transactional
    public CreateStoryResponse createStory(CreateStoryCommand command) {
        String taskId = idGenerator.nextTaskId();

        StoryTaskEntity entity = StoryTaskEntity.builder()
                .taskId(taskId)
                .userId(command.userId())
                .genre(command.genre())
                .theme(command.theme())
                .audience(command.audience())
                .targetWords(command.targetWords())
                .status(TaskStatus.PENDING.name())
                .currentStage("INIT")
                .progress(0)
                .build();

        storyTaskRepository.save(entity);

        storyEventPublisher.publishTaskCreated(new StoryTaskCreatedEvent(
                taskId,
                command.userId(),
                command.genre(),
                command.theme(),
                command.audience(),
                command.targetWords(),
                command.keywords(),
                command.preferredStyle()
        ));

        return new CreateStoryResponse(taskId, "PENDING", "故事创作任务已受理");
    }
}

这段代码体现了生产系统里非常重要的一点:先落库,再发事件
更严谨的做法是结合Outbox Pattern,避免数据库成功但消息发送失败导致数据不一致。

多智能体编排器设计

Orchestrator的职责边界

编排器不是“另一个大而全的业务服务”,它的职责应严格聚焦于:

  • 维护任务状态机
  • 决定下一步该执行哪个Agent
  • 管理阶段依赖关系
  • 控制超时、重试、降级、补偿
  • 汇总阶段结果并产出最终结果

不要把Prompt拼装、模型调用细节、数据库复杂查询都塞进Orchestrator,否则它会很快变成新的单点复杂系统。

基于CompletableFuture的并行编排

在某些场景下,单个节点内部也可以做并行执行,比如 PlotAgentCharacterAgent 在意图解析完成后并行运行。

@Service
@RequiredArgsConstructor
public class StoryOrchestrator {

    private final IntentAgent intentAgent;
    private final PlotAgent plotAgent;
    private final CharacterAgent characterAgent;
    private final ChapterAgent chapterAgent;
    private final StyleAgent styleAgent;
    private final ReviewAgent reviewAgent;
    private final StoryContextRepository storyContextRepository;
    private final Executor storyExecutor;

    public StoryResult orchestrate(StoryCreationContext context) {
        IntentResult intentResult = intentAgent.execute(context);
        StoryCreationContext enriched = context.withIntentResult(intentResult);

        CompletableFuture<PlotResult> plotFuture =
                CompletableFuture.supplyAsync(() -> plotAgent.execute(enriched), storyExecutor);

        CompletableFuture<CharacterResult> characterFuture =
                CompletableFuture.supplyAsync(() -> characterAgent.execute(enriched), storyExecutor);

        PlotResult plotResult = plotFuture.join();
        CharacterResult characterResult = characterFuture.join();

        StoryCreationContext chapterContext = enriched
                .withPlotResult(plotResult)
                .withCharacterResult(characterResult);

        ChapterDraft draft = chapterAgent.execute(chapterContext);
        StyledStory styledStory = styleAgent.execute(chapterContext.withDraft(draft));
        ReviewResult reviewResult = reviewAgent.execute(chapterContext.withStyledStory(styledStory));

        return StoryResult.builder()
                .plot(plotResult.plot())
                .characters(characterResult.characters())
                .story(reviewResult.finalStory())
                .reviewTags(reviewResult.tags())
                .build();
    }
}

这里虽然是同步式样例,但它体现了生产实现的两个关键点:

  • 依赖前的并行化
  • 统一上下文对象传递

真正的线上做法:事件驱动 + 幂等执行

如果系统目标是高并发,编排不应该完全依赖本地线程。更推荐的做法是:

  • Orchestrator根据任务状态发阶段消息
  • Worker订阅消息并执行具体Agent
  • 执行结果再回写阶段完成事件
  • Orchestrator决定是否推进到下个阶段

这样即使单个Pod宕机,也不会导致任务整体丢失。

Agent设计原则与实现模式

Agent的职责设计

每个Agent都应该满足以下约束:

  • 输入边界清晰
  • 输出结构稳定
  • Prompt模板可版本化
  • 错误码明确
  • 可独立测试

不推荐让Agent既负责“生成内容”,又负责“数据持久化”,又负责“路由下游”。Agent更像一个纯能力节点。

抽象Agent基类

public abstract class BaseStoryAgent<I, O> {

    protected final LlmGatewayClient llmGatewayClient;
    protected final PromptTemplateEngine promptTemplateEngine;

    protected BaseStoryAgent(LlmGatewayClient llmGatewayClient,
                             PromptTemplateEngine promptTemplateEngine) {
        this.llmGatewayClient = llmGatewayClient;
        this.promptTemplateEngine = promptTemplateEngine;
    }

    public O execute(StoryCreationContext context) {
        I request = buildRequest(context);
        validateRequest(request);

        String prompt = promptTemplateEngine.render(templateName(), request);
        LlmResponse response = llmGatewayClient.generate(buildLlmRequest(prompt));

        O result = parseResponse(response);
        validateResult(result);
        return result;
    }

    protected abstract String templateName();

    protected abstract I buildRequest(StoryCreationContext context);

    protected abstract O parseResponse(LlmResponse response);

    protected abstract void validateRequest(I request);

    protected abstract void validateResult(O result);

    protected LlmRequest buildLlmRequest(String prompt) {
        return LlmRequest.builder()
                .model("gpt-4.1")
                .temperature(0.7)
                .maxTokens(2000)
                .prompt(prompt)
                .build();
    }
}

这个抽象层的意义在于统一Agent行为:

  • Prompt渲染方式统一
  • 模型调用入口统一
  • 结果校验逻辑统一
  • 后续做埋点、审计、熔断时更容易扩展

示例:PlotAgent

@Component
public class PlotAgent extends BaseStoryAgent<PlotRequest, PlotResult> {

    public PlotAgent(LlmGatewayClient llmGatewayClient,
                     PromptTemplateEngine promptTemplateEngine) {
        super(llmGatewayClient, promptTemplateEngine);
    }

    @Override
    protected String templateName() {
        return "plot-agent-v3";
    }

    @Override
    protected PlotRequest buildRequest(StoryCreationContext context) {
        return new PlotRequest(
                context.genre(),
                context.theme(),
                context.audience(),
                context.intentResult().coreExpectation(),
                context.keywords()
        );
    }

    @Override
    protected PlotResult parseResponse(LlmResponse response) {
        return JsonUtils.fromJson(response.content(), PlotResult.class);
    }

    @Override
    protected void validateRequest(PlotRequest request) {
        Assert.hasText(request.genre(), "genre must not be empty");
        Assert.hasText(request.theme(), "theme must not be empty");
    }

    @Override
    protected void validateResult(PlotResult result) {
        if (result == null || result.outline() == null || result.outline().isEmpty()) {
            throw new AgentExecutionException("PLOT_EMPTY", "plot result is empty");
        }
    }
}

这个版本与“简单直接调用LLM”的差别是,它已经具备生产可演进的基本特征:

  • 模板版本可管理
  • 输入输出结构清晰
  • 校验失败有明确异常
  • Agent可单测,可灰度,可替换

LLM Gateway:生产环境中的关键中台

很多团队把模型调用直接散落在各个业务服务中,这在初期很快,但后期一定失控。生产环境强烈建议引入 LLM Gateway,统一处理以下能力:

  • 模型路由
  • API Key管理
  • 调用超时
  • 重试与熔断
  • 限流与配额
  • 请求审计
  • 成本统计
  • 响应标准化

Gateway接口设计

public interface LlmGatewayClient {

    LlmResponse generate(LlmRequest request);

    default LlmResponse generateWithFallback(LlmRequest primary, LlmRequest fallback) {
        try {
            return generate(primary);
        } catch (Exception ex) {
            return generate(fallback);
        }
    }
}

使用Resilience4j做稳定性治理

@Service
@RequiredArgsConstructor
public class ResilientLlmGatewayClient implements LlmGatewayClient {

    private final ExternalLlmClient externalLlmClient;
    private final CircuitBreaker circuitBreaker;
    private final Retry retry;
    private final TimeLimiter timeLimiter;

    @Override
    public LlmResponse generate(LlmRequest request) {
        Supplier<CompletableFuture<LlmResponse>> supplier = () ->
                CompletableFuture.supplyAsync(() -> externalLlmClient.invoke(request));

        try {
            return Decorators.ofSupplier(supplier)
                    .withCircuitBreaker(circuitBreaker)
                    .withRetry(retry)
                    .withTimeLimiter(timeLimiter)
                    .decorate()
                    .get()
                    .join();
        } catch (Exception e) {
            throw new LlmGatewayException("LLM_GATEWAY_ERROR", e.getMessage(), e);
        }
    }
}

这部分非常关键。真正的线上不稳定,通常不是你自己的Java代码,而是模型侧RT抖动、限流或依赖网络异常。

高并发设计:吞吐、隔离与降级

高并发的瓶颈在哪里

故事创作平台的核心瓶颈通常不是CPU,而是:

  • LLM外部调用耗时
  • 线程池阻塞
  • 数据库连接耗尽
  • Kafka消费堆积
  • Redis热点Key

因此,优化目标不应只是“提高TPS”,而是同时保证:

  • 系统稳定
  • 平均耗时可控
  • 高峰期不雪崩
  • 核心功能优先可用

线程池隔离

不要把Web请求线程、编排线程、Agent执行线程、回调线程混在一起。建议至少拆成三类线程池:

  • apiExecutor:短任务,接入层快速处理
  • orchestratorExecutor:编排调度与状态推进
  • agentWorkerExecutor:实际模型调用与结果解析
@Configuration
public class ExecutorConfig {

    @Bean("orchestratorExecutor")
    public ThreadPoolTaskExecutor orchestratorExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(16);
        executor.setMaxPoolSize(64);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("story-orchestrator-");
        executor.initialize();
        return executor;
    }

    @Bean("agentWorkerExecutor")
    public ThreadPoolTaskExecutor agentWorkerExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(32);
        executor.setMaxPoolSize(128);
        executor.setQueueCapacity(5000);
        executor.setThreadNamePrefix("story-agent-worker-");
        executor.initialize();
        return executor;
    }
}

背压控制

当外部模型接口变慢时,如果平台还在无限接收任务,很容易把线程池、消息队列和数据库全部压垮。必须设计背压机制:

  • API Gateway限流
  • Kafka消费速率控制
  • Worker并发度控制
  • 队列长度阈值告警
  • 必要时返回“系统繁忙,请稍后重试”

分级降级策略

在高峰时,建议对不同功能进行分级: 等级 功能 降级策略
P0 新建故事任务 尽量保留
P1 查询任务进度 保留,读缓存
P2 风格重写 可排队延迟
P3 多版本对比、高级润色 高峰时直接关闭

这比“全站一起挂”要现实得多。

可扩展设计:如何支持更多Agent与更多玩法

插件化Agent注册

随着业务发展,常见的新需求包括:

  • 新增“儿童绘本Agent”
  • 新增“游戏支线剧情Agent”
  • 新增“敏感词审核Agent”
  • 新增“角色关系修复Agent”

如果每新增一个Agent都需要改一堆编排代码,系统很快失控。建议做成注册式:

public interface StoryAgent {
    String agentName();
    String stageName();
    AgentExecutionResult execute(AgentExecutionContext context);
}
@Component
public class StoryAgentRegistry {

    private final Map<String, StoryAgent> agentMap;

    public StoryAgentRegistry(List<StoryAgent> agents) {
        this.agentMap = agents.stream()
                .collect(Collectors.toUnmodifiableMap(StoryAgent::stageName, Function.identity()));
    }

    public StoryAgent getByStage(String stage) {
        StoryAgent agent = agentMap.get(stage);
        if (agent == null) {
            throw new IllegalArgumentException("No agent registered for stage: " + stage);
        }
        return agent;
    }
}

这样新增Agent时只需:

  1. 新增实现类
  2. 配置阶段依赖关系
  3. 注册Prompt模板
  4. 接入监控指标

DAG配置化

进一步可以把编排关系做成配置,而不是硬编码:

story:
  workflow:
    stages:
      - name: intent
        dependsOn: []
      - name: plot
        dependsOn: [intent]
      - name: character
        dependsOn: [intent]
      - name: chapter
        dependsOn: [plot, character]
      - name: style
        dependsOn: [chapter]
      - name: review
        dependsOn: [style]

这种方式很适合做不同题材、不同租户、不同会员等级下的流程差异化编排。

数据存储设计:不是只有“存结果”这么简单

表设计建议

至少应拆分为以下几类表:

  • story_task:任务主表
  • story_stage_execution:阶段执行明细
  • story_result:最终结果与多版本内容
  • story_prompt_snapshot:Prompt快照与模型参数
  • story_cost_record:Token和调用成本

阶段执行明细表的价值

很多系统只记录最终结果,不记录中间阶段,这是后期运维的大坑。阶段执行明细至少要记录:

  • 哪个Agent执行
  • 第几次重试
  • 输入摘要
  • 输出摘要
  • 耗时
  • Token消耗
  • 错误码
  • 执行节点

这会直接决定你能不能回答这些线上问题:

  • 为什么这个故事质量变差了?
  • 为什么本周成本飙升?
  • 为什么某个租户总是超时?
  • 为什么同样的Prompt在A环境正常、B环境异常?

Redis设计:缓存不是只存最终结果

Redis在这个平台里至少有四种用途:

  1. 任务进度缓存
    便于前端快速查询进度,减少数据库压力。
  2. 热点Prompt模板缓存
    降低模板加载和解析开销。
  3. 用户偏好短期记忆
    例如用户偏好悬疑风、女性向、轻小说节奏。
  4. 幂等控制与分布式锁
    防止重复消费、重复推进状态机。

示例Key设计:

story:task:progress:{taskId}
story:task:result:{taskId}
story:user:preference:{userId}
story:stage:idempotent:{taskId}:{stage}

要注意两个点:

  • 大文本结果不要无限期放Redis,最终应回写数据库或对象存储
  • 缓存过期时间应与业务查询窗口匹配,避免内存浪费

Kafka设计:让系统真正具备弹性

Topic划分建议

一种比较稳妥的Topic设计是按事件类型拆分,而不是所有消息都混一个Topic:

  • story-task-created
  • story-stage-dispatch
  • story-stage-result
  • story-task-notify
  • story-dead-letter

消费幂等

消息系统天然可能重复投递,因此必须做幂等。典型实现方式:

  • 每个阶段执行前,先检查Redis幂等Key
  • 或者在数据库里对 taskId + stage + attempt 做唯一约束
  • 状态推进使用CAS或乐观锁
public boolean tryExecuteStage(String taskId, String stage) {
    String key = "story:stage:idempotent:" + taskId + ":" + stage;
    Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(30));
    return Boolean.TRUE.equals(success);
}

死信队列

当阶段任务连续失败,不能无限重试。建议:

  • 瞬时错误:指数退避重试2到3次
  • 业务不可恢复错误:直接终止
  • 多次失败:投递死信队列,等待人工排查或离线修复

真实业务案例:高并发“睡前故事工厂”

下面给出一个更真实的案例,帮助理解多智能体系统如何在业务上创造价值。

场景设定

某儿童内容平台推出“睡前故事工厂”功能,面向家长群体,支持输入:

  • 孩子年龄
  • 想要的主题,例如“勇气”“分享”“探索”
  • 喜欢的角色,例如“小狐狸”“太空船长”
  • 期望篇幅,例如800字、1500字

平台目标:

  • 平峰每秒80个请求
  • 高峰每秒500个请求
  • 90线响应:2秒内返回任务受理结果
  • 完整故事平均生成时间:8到20秒

多Agent分工

Agent 作用
Intent Agent 提炼家长真实意图,补齐缺失约束
Education Agent 引入年龄适配和教育目标
Plot Agent 生成故事大纲
Character Agent 定义角色性格与对白风格
Chapter Agent 生成正文
Review Agent 审核措辞、暴力内容、年龄适配

其中 Education Agent 是面向业务价值新增的特殊Agent。它并不直接“写故事”,但能显著提升内容适配度,这是多智能体系统特别有价值的地方。

高峰流量下的执行策略

高峰期采用以下策略:

  • 普通用户请求排队
  • 会员用户进入高优先级队列
  • 1500字以上长故事拆成多章节并行生成
  • Review Agent使用轻量模型先做预审,失败时再切重模型

这种策略同时兼顾了体验、成本和吞吐。

生产级质量保障:测试体系怎么做

单元测试

每个Agent都应支持独立单测,重点验证:

  • Prompt输入组装是否正确
  • LLM响应解析是否稳定
  • 空结果、非法结果是否能识别
@ExtendWith(MockitoExtension.class)
class PlotAgentTest {

    @Mock
    private LlmGatewayClient llmGatewayClient;

    @Mock
    private PromptTemplateEngine promptTemplateEngine;

    @InjectMocks
    private PlotAgent plotAgent;

    @Test
    void should_generate_plot_successfully() {
        StoryCreationContext context = StoryCreationContextFixture.defaultContext();

        when(promptTemplateEngine.render(anyString(), any()))
                .thenReturn("mocked prompt");
        when(llmGatewayClient.generate(any()))
                .thenReturn(new LlmResponse("{\"outline\":[\"第一幕\",\"第二幕\"]}", 500, 800));

        PlotResult result = plotAgent.execute(context);

        assertThat(result.outline()).hasSize(2);
    }
}

集成测试

需要覆盖:

  • 创建任务后是否落库成功
  • 是否成功投递Kafka事件
  • 阶段完成后状态是否正确推进
  • 失败重试后是否能恢复

压测

高并发系统上线前,压测不是可选项。建议至少覆盖三类压测:

  1. 接口层压测
    验证任务创建接口在限流下的稳定性。
  2. 消息堆积压测
    模拟模型侧RT变慢,观察Kafka Lag是否可控。
  3. 端到端压测
    从任务创建到最终结果生成,验证整体吞吐和平均完成时长。

压测指标建议至少观察:

  • QPS
  • P95 / P99延迟
  • Kafka Consumer Lag
  • 线程池活跃数
  • Redis命中率
  • 数据库连接池使用率
  • 模型调用成功率

可观测性建设:没有监控,就没有多智能体生产系统

指标体系

推荐至少建设以下指标:

  • story_task_create_qps
  • story_task_success_rate
  • story_stage_latency_ms
  • story_stage_retry_count
  • llm_call_latency_ms
  • llm_call_error_rate
  • llm_tokens_prompt_total
  • llm_tokens_completion_total
  • story_cost_total

日志体系

日志必须带以下字段:

  • traceId
  • taskId
  • userId
  • stage
  • agentName
  • model
  • latencyMs
  • retryCount
  • errorCode

链路追踪

建议使用OpenTelemetry + Micrometer + Prometheus + Grafana。

重点不是“有没有Trace”,而是要能回答:

  • 某个故事为什么30秒还没完成?
  • 是哪个Agent最慢?
  • 是模型慢,还是队列堆积,还是数据库写入阻塞?

部署与容器化:从可运行到可运营

Dockerfile

FROM eclipse-temurin:21-jre

WORKDIR /app
COPY target/story-platform.jar app.jar

ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75"

EXPOSE 8080

ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: story-orchestrator
  namespace: story-platform
spec:
  replicas: 3
  selector:
    matchLabels:
      app: story-orchestrator
  template:
    metadata:
      labels:
        app: story-orchestrator
    spec:
      containers:
        - name: story-orchestrator
          image: registry.example.com/story-orchestrator:1.0.0
          ports:
            - containerPort: 8080
          env:
            - name: SPRING_PROFILES_ACTIVE
              value: prod
            - name: JAVA_OPTS
              value: "-Xms512m -Xmx2048m"
          resources:
            requests:
              cpu: "500m"
              memory: "1Gi"
            limits:
              cpu: "2"
              memory: "3Gi"
          readinessProbe:
            httpGet:
              path: /actuator/health/readiness
              port: 8080
            initialDelaySeconds: 15
            periodSeconds: 10
          livenessProbe:
            httpGet:
              path: /actuator/health/liveness
              port: 8080
            initialDelaySeconds: 30
            periodSeconds: 20

HPA自动扩容

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: story-agent-worker-hpa
  namespace: story-platform
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: story-agent-worker
  minReplicas: 4
  maxReplicas: 30
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 65

如果业务波峰非常明显,建议进一步基于Kafka Lag做弹性扩容,而不只是依赖CPU。

配置示例:application.yml

server:
  port: 8080

spring:
  application:
    name: story-platform
  datasource:
    url: jdbc:postgresql://postgres:5432/story_platform
    username: story_user
    password: ${DB_PASSWORD}
    hikari:
      maximum-pool-size: 30
      minimum-idle: 10
  data:
    redis:
      host: redis
      port: 6379
      timeout: 3000ms
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      group-id: story-worker-group
      enable-auto-commit: false
      max-poll-records: 50
    producer:
      acks: all
      retries: 3

management:
  endpoints:
    web:
      exposure:
        include: health,info,prometheus
  endpoint:
    health:
      probes:
        enabled: true

resilience4j:
  circuitbreaker:
    instances:
      llmGateway:
        slidingWindowSize: 20
        failureRateThreshold: 50
  retry:
    instances:
      llmGateway:
        maxAttempts: 3
        waitDuration: 500ms
  timelimiter:
    instances:
      llmGateway:
        timeoutDuration: 8s

story:
  workflow:
    max-retry: 2
    result-expire-hours: 72
  llm:
    primary-model: gpt-4.1
    fallback-model: gpt-4.1-mini
  executor:
    orchestrator-core-size: 16
    worker-core-size: 32

常见故障与处理策略

模型调用超时

处理建议:

  • 设置单阶段超时,不允许无限等待
  • 启用重试,但要限制次数
  • 尝试降级到轻量模型
  • 保留中间结果,支持人工或离线补偿

某个Agent输出格式异常

处理建议:

  • 所有Agent输出必须走JSON Schema校验
  • 格式不合法时先自动修复一次
  • 修复失败则进入失败分支,不要污染下游

Kafka消费堆积

处理建议:

  • 提升消费者实例数
  • 减少单次批量大小
  • 增大分区数
  • 排查是否是下游模型调用慢导致的“假性堆积”

部分阶段成功、部分失败

处理建议:

  • 允许 PARTIAL_SUCCESS
  • 对已经成功的阶段结果做复用
  • 用户侧可以展示“基础稿已生成,润色稍后完成”

这会显著提升用户体验,不必所有失败都表现为“任务失败”。

从单体PoC到企业级平台的演进路线

阶段一:PoC验证期

特点:

  • 所有Agent在单JVM内
  • 直接同步调用模型
  • 结果简单落库

目标:

  • 快速验证业务价值和内容质量

阶段二:业务上线期

特点:

  • API与编排逻辑分层
  • 接入Redis、Kafka
  • 任务异步执行

目标:

  • 承接真实业务流量

阶段三:规模化期

特点:

  • Agent Worker拆分
  • 模型调用统一网关化
  • 完整监控和成本分析体系

目标:

  • 稳定支撑高并发,持续优化质量与成本

阶段四:平台化期

特点:

  • DAG配置化
  • Agent插件化
  • Prompt管理平台化
  • 租户隔离与配额体系

目标:

  • 从“一个业务系统”升级为“多智能体内容生产平台”

最佳实践总结

如果要把这套方案真正落地,我建议优先抓住以下10个关键点:

  1. 不要把故事创作当成同步接口问题,要把它当成异步任务系统来设计。
  2. 多智能体协作优先采用DAG编排,而不是简单串行调用。
  3. Agent必须职责单一、输入输出结构化、可独立测试。
  4. 模型调用必须统一接入LLM Gateway,不要散落在业务代码中。
  5. 高并发优化重点是线程池隔离、背压控制、消息解耦,而不是盲目堆机器。
  6. 所有阶段都要持久化快照,否则质量优化和问题排查几乎无从下手。
  7. 系统必须具备幂等、重试、超时、熔断、死信等分布式治理能力。
  8. 观测维度至少覆盖任务、阶段、模型、成本四个层面。
  9. 架构设计要为新增Agent和新增工作流留出扩展点。
  10. 在内容生产业务里,质量、稳定性、成本必须同时优化,不能只看模型效果。

结语

多智能体系统的真正价值,不是“把一个任务拆成多个Prompt”这么简单,而是把原本不可控的生成式任务,重构为一个可编排、可扩展、可治理、可观测的工程系统。

对于故事创作平台来说,人工智能框架AgentScope Java提供了很好的多智能体抽象基础,而Spring Boot、Kafka、Redis、PostgreSQL、Kubernetes则补足了生产环境所必须的工程能力。二者结合,才能构建一个既有内容生成质量,又能承受高并发和持续演进的企业级平台。

如果你的目标只是做一个Demo,那么单次Prompt也许够用;但如果你的目标是做一个真正上线、可持续运营、可支撑增长的故事创作平台,那么多智能体架构几乎是必经之路。

从架构师视角看,这类系统最核心的不是“会不会调用模型”,而是能不能把模型能力工程化、平台化,并在复杂业务中稳定交付价值。这也是本文试图回答的核心问题。想要获取更多类似的技术架构深度解析与实战代码,欢迎访问云栈社区,与其他开发者一起交流学习。




上一篇:MySQL CPU 飙升500%的紧急应对与根治策略:从问题定位到架构治理
下一篇:生产级实践:Go-Zero数据库自动化从生成到治理全指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-6 00:30 , Processed in 0.601622 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表