引言:从“等待答案”到“实时对话”的范式跃迁
在构建企业级 AI 应用时,响应延迟是用户体验的致命杀手。传统方案中,用户提交问题后需等待整个 LLM 推理完成才能看到结果——这在复杂任务(如长文生成、多步推理)中尤为明显。
阿里巴巴开源的 Spring AI Alibaba 框架,通过其 Graph Core 模块与 Spring AI 原生集成,为 Java 开发者提供了一套完整的 LLM 流式输出解决方案。本文将深入剖析其三种核心使用模式,并提供可直接落地的工程实践指南,相关技术讨论欢迎在云栈社区的开发者论坛进行。
一、技术全景:Spring AI Alibaba 流式架构
Spring AI Alibaba 的流式能力建立在两大基石之上:
- Spring AI 的
ChatClient.stream()
提供对主流 LLM(通义千问、GPT、Claude 等)的统一异步流式调用接口;
- Graph Core 的
Flux<NodeOutput>
将节点级流式输出提升至图级别,实现端到端的事件驱动工作流。
二者结合,形成如下分层架构:
┌───────────────────────┐
│ 应用层 (Web UI) │ ← SSE / WebSocket
└───────────┬───────────┘
↓
┌───────────────────────┐
│ Graph Core 流式 API │ ← Flux<NodeOutput>
└───────────┬───────────┘
↓
┌───────────────────────┐
│ 节点内 LLM 流式调用 │ ← Flux<ChatResponse>
└───────────┬───────────┘
↓
┌───────────────────────┐
│ LLM 服务 (Qwen等) │
└───────────────────────┘
✅ 核心价值:开发者无需处理底层连接管理、背压控制、错误重试,专注业务逻辑。
二、模式一:基础流式调用 —— 直接使用 ChatClient
这是最轻量级的流式方案,适用于简单问答场景。
2.1 代码实现
import org.springframework.ai.chat.client.ChatClient;
import reactor.core.publisher.Flux;
// 获取流式响应
Flux<ChatResponse> flux = chatClient.prompt()
.user("讲个笑话")
.stream() // ← 关键:启用流式
.chatResponse();
// 订阅并实时输出
flux.subscribe(
response -> {
String content = response.getResult().getOutput().getText();
System.out.print(content); // 逐 token 打印
},
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("\nStream completed")
);
2.2 输出效果
Sure, here's a joke for you:
Why don't scientists trust atoms?
Because they make up everything!
Stream completed
2.3 适用场景
- 单轮对话机器人;
- 简单内容生成(如摘要、翻译);
- 快速原型验证。
⚠️ 局限:无法嵌入复杂工作流,缺乏状态管理与中断恢复能力。
三、模式二:阻塞式流处理 —— 兼顾流式与聚合
某些场景下,既需要流式体验,又需最终完整结果进行后续处理(如存库、分析)。
3.1 代码实现
Flux<ChatResponse> flux = chatClient.prompt()
.user("讲个笑话")
.stream()
.chatResponse();
// 阻塞等待所有 token 到达,然后聚合处理
List<ChatResponse> responses = flux.collectList().block();
responses.forEach(response -> {
String content = response.getResult().getOutput().getText();
System.out.println("Received: " + content);
});
3.2 技术本质
collectList():将 Flux<T> 转换为 Mono<List<T>>;
.block():阻塞当前线程直到流完成(仅限非 Web 场景!)。
3.3 适用场景
- 后台批处理任务;
- 测试脚本;
- 需要完整响应体的同步逻辑。
❗ 警告:在 WebFlux 或高并发环境中,禁止使用 .block(),应改用 flatMap 等异步操作符。
四、模式三:Graph 节点流式 —— 构建智能工作流
这是企业级应用的核心模式,将 LLM 流式能力嵌入有状态、可中断、可恢复的工作流。
4.1 步骤 1:创建流式节点
public class StreamingAgentNode implements NodeAction {
private final ChatClient chatClient;
public StreamingAgentNode(ChatClient.Builder builder) {
this.chatClient = builder.build();
}
@Override
public Map<String, Object> apply(OverAllState state) {
String userMessage = (String) state.value("query").orElse("Hello");
// 返回 LLM 的 token 流(Flux<String>)
Flux<String> contentFlux = chatClient.prompt()
.user(userMessage)
.stream()
.content(); // ← 直接获取文本流
return Map.of("answer", contentFlux);
}
}
关键设计
- 返回
Flux 对象:框架自动处理流式消费;
- 状态策略:需配置
AppendStrategy 以支持流聚合(见 4.2)。
4.2 步骤 2:配置状态策略
KeyStrategyFactory keyStrategyFactory = () -> Map.of(
"query", new ReplaceStrategy(),
"answer", new AppendStrategy() // ← 必须!用于累积 token
);
💡 原理:当后续节点访问 answer 时,框架已自动完成对 Flux 的订阅,并将所有 token 拼接为完整字符串。
4.3 步骤 3:组装并运行图
// 构建工作流
StateGraph graph = new StateGraph(keyStrategyFactory)
.addNode("agent", new StreamingAgentNode(chatClientBuilder))
.addEdge(StateGraph.START, "agent")
.addEdge("agent", StateGraph.END);
CompiledGraph compiledGraph = graph.compile();
// 执行(非流式)
Map<String, Object> input = Map.of("query", "Hello");
OverAllState result = compiledGraph.invoke(input);
System.out.println("Final result: " + result.value("answer").orElse(""));
4.4 步骤 4:启用图级别流式输出(高级)
若需在图执行过程中实时监听 token,使用 graph.stream():
Flux<NodeOutput> stream = compiledGraph.stream(input, config);
stream.subscribe(output -> {
if (output instanceof StreamingOutput<?> streaming) {
System.out.print(streaming.chunk()); // 实时打印 token
}
});
五、工程实践:Web 应用中的流式集成
在 Spring WebFlux 中,直接暴露 Flux 即可实现 Server-Sent Events (SSE)。这对于构建实时交互的人工智能应用非常有用。
@RestController
public class ChatController {
private final CompiledGraph graph;
@GetMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<NodeOutput> streamChat(@RequestParam String query) {
RunnableConfig config = RunnableConfig.builder()
.threadId(UUID.randomUUID().toString())
.build();
return graph.stream(Map.of("query", query), config);
}
}
前端消费示例(JavaScript)
const eventSource = new EventSource('/chat/stream?query=你好');
eventSource.onmessage = (event) => {
const output = JSON.parse(event.data);
if (output.type === 'StreamingOutput') {
document.getElementById('response').innerText += output.chunk;
}
};
✅ 优势:零额外依赖,天然支持浏览器原生 SSE。
六、最佳实践与避坑指南
6.1 错误处理
始终为流添加错误处理器:
flux.doOnError(error -> log.error("LLM 流式错误", error))
.onErrorReturn(defaultResponse);
6.2 资源清理
使用 doFinally 释放资源:
flux.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE || signalType == SignalType.ON_ERROR) {
cleanupResources();
}
});
6.3 性能优化
- 避免阻塞:绝不在线程池中调用
.block();
- 背压控制:对高吞吐场景使用
.onBackpressureBuffer();
- 连接复用:确保
ChatClient 使用 HTTP 连接池。
6.4 安全考量
- 输入过滤:防止 prompt injection 攻击;
- 速率限制:在网关层限制
/stream 接口的 QPS。
七、总结:流式是生产级 AI 应用的标配
Spring AI Alibaba 通过三层流式抽象——基础 LLM 流、阻塞聚合流、图工作流流——为不同复杂度的场景提供了恰到好处的解决方案。这为希望将智能体能力集成到现有Java后端系统中的开发者提供了清晰的路径。
- 简单场景:直接使用
ChatClient.stream();
- 复杂工作流:通过
StreamingAgentNode + graph.stream() 构建有状态、可观察的智能体;
- Web 集成:无缝对接 WebFlux,实现低延迟用户体验。
在 LLM 应用从“能用”走向“好用”的今天,流式输出已不再是可选项,而是用户体验的底线要求。Spring AI Alibaba 为此提供了一套符合 Java 工程师习惯、生产就绪的现代化工具链。
未来展望:随着多模态模型的普及,流式输出或将扩展至图像生成(progressive rendering)、语音合成(streaming TTS)等领域,成为智能体“多感官输出”的基础。