在处理耗时较长的接口时,直接采用异步接口是一种常见的解决方案,例如使用 Callable、WebAsyncTask 或 DeferredResult。然而,这些方法通常只能返回单个处理结果。当业务场景要求接口在处理过程中,能够持续、渐进地向客户端反馈中间状态或结果时,这些传统方式就显得力不从心了。
Spring框架为此提供了专门的异步流式响应工具,主要包括 ResponseBodyEmitter、SseEmitter 和 StreamingResponseBody。它们的用法相当直观,在接口中直接返回相应的对象或包裹在 ResponseEntity<T> 中即可。这样一来,接口将变为异步非阻塞的,即使执行耗时操作也不会占用 Servlet 容器的请求线程,从而保障了系统的整体响应能力。
下面我们将逐一探讨这三种工具的具体应用及其适用场景。
ResponseBodyEmitter
ResponseBodyEmitter 适用于需要动态生成内容并分块发送给客户端的场景,例如实时展示文件上传进度、推送日志流或模拟 AI 对话的逐字输出效果。它允许开发者在后台任务执行过程中,多次调用 send 方法向客户端逐步推送数据。
想象一下类似 ChatGPT 的交互体验:提问后,答案并非一次性完整呈现,而是逐字逐句地动态生成。这种做法能显著提升交互的生动感和实时性。ResponseBodyEmitter 就可以轻松实现类似效果。

图1:类似大语言模型的流式对话交互示意图
以下是一个简单的实现示例,它创建一个 ResponseBodyEmitter 对象,并在一个异步任务中模拟耗时操作,每隔2秒向客户端发送一次当前进度信息。
注意:ResponseBodyEmitter 的超时时间可通过构造函数设置。若设为 0 或 -1,则表示连接永不过时;若不设置,则会使用默认超时时间,连接超时后将自动断开。此规则同样适用于后文介绍的另外两种工具。
@GetMapping("/bodyEmitter")
public ResponseBodyEmitter handle() {
// 创建一个ResponseBodyEmitter,-1代表不超时
ResponseBodyEmitter emitter = new ResponseBodyEmitter(-1L);
// 异步执行耗时操作
CompletableFuture.runAsync(() -> {
try {
// 模拟耗时操作
for (int i = 0; i < 10000; i++) {
System.out.println("bodyEmitter " + i);
// 发送数据
emitter.send("bodyEmitter " + i + " @ " + new Date() + "\n");
Thread.sleep(2000);
}
// 完成
emitter.complete();
} catch (Exception e) {
// 发生异常时结束接口
emitter.completeWithError(e);
}
});
return emitter;
}
代码逻辑清晰明了。通过模拟每2秒发送一次数据,当请求该接口时,客户端页面便能观察到数据在持续、动态地追加显示,效果与上述的流式对话体验类似。

图2:ResponseBodyEmitter接口的流式输出日志效果
SseEmitter
SseEmitter 是 ResponseBodyEmitter 的一个子类,专为 服务器向客户端 单向推送实时事件流而设计。它基于 Server-Sent Events (SSE) 协议,非常适合实现实时消息通知、股票报价、体育赛事比分更新等场景。

图3:SSE技术实现服务器向客户端页面推送实时更新
SSE 的本质是在客户端与服务器之间建立一条单向的、长时的 HTTP 连接。服务器响应的 Content-Type 为 text/event-stream,从而可以持续不断地将事件流推送给客户端。这个过程类似于在线视频播放,数据流是连续传输的。

图4:SSE通信中客户端与服务器的事件流交互流程
客户端通过 JavaScript 的 EventSource API 发起连接,之后便进入监听状态,等待接收服务器发来的事件。
<body>
<div id="content" style="text-align: center;">
<h1>SSE 接收服务端事件消息数据</h1>
<div id="message">等待连接...</div>
</div>
<script>
let source = null;
let userId = 7777
function setMessageInnerHTML(message) {
const messageDiv = document.getElementById("message");
const newParagraph = document.createElement("p");
newParagraph.textContent = message;
messageDiv.appendChild(newParagraph);
}
if (window.EventSource) {
// 建立连接
source = new EventSource('http://127.0.0.1:9033/subSseEmitter/'+userId);
setMessageInnerHTML("连接用户=" + userId);
/**
* 连接一旦建立,就会触发open事件
* 另一种写法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
/**
* 客户端收到服务器发来的数据
* 另一种写法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
</script>
</body>
服务端需要维护一个 SseEmitter 对象池(例如使用 Map),将连接与用户标识关联起来。当有消息需要推送给特定用户时,便可从池中取出对应的 SseEmitter 并调用其 send 方法。
private static final Map<String, SseEmitter> EMITTER_MAP = new ConcurrentHashMap<>();
@GetMapping("/subSseEmitter/{userId}")
public SseEmitter sseEmitter(@PathVariable String userId) {
log.info("sseEmitter: {}", userId);
SseEmitter emitterTmp = new SseEmitter(-1L);
EMITTER_MAP.put(userId, emitterTmp);
CompletableFuture.runAsync(() -> {
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data("sseEmitter" + userId + " @ " + LocalTime.now())
.id(String.valueOf(userId))
.name("sseEmitter");
emitterTmp.send(event);
} catch (Exception ex) {
emitterTmp.completeWithError(ex);
}
});
return emitterTmp;
}
@GetMapping("/sendSseMsg/{userId}")
public void sseEmitter(@PathVariable String userId, String msg) throws IOException {
SseEmitter sseEmitter = EMITTER_MAP.get(userId);
if (sseEmitter == null) {
return;
}
sseEmitter.send(msg);
}
此后,通过调用类似 /sendSseMsg/7777?msg=欢迎关注 的接口,即可向 userId 为 7777 的用户实时推送消息,并在其页面上即时展示。

图5:SSE客户端页面实时接收并展示服务端推送的消息
SSE 还有一个重要优势:连接建立后,如果因为网络波动或服务重启导致连接中断,客户端浏览器会自动尝试重新建立连接,保证了通信的鲁棒性。

图6:服务端日志显示SSE连接初始化和对应的控制器信息
StreamingResponseBody
StreamingResponseBody 的设计初衷略有不同,它主要面向大数据量或持续数据流的直接传输。其核心是提供一个 OutputStream,允许开发者将数据直接写入此流中,非常适合处理大文件下载、视频流传输等场景,能够有效避免一次性加载全部数据到内存所导致的内存溢出(OOM)风险。
例如,在下载一个超大文件时,使用 StreamingResponseBody 可以循环读取文件块,并持续写入输出流,实现“边读边传”,对服务器内存非常友好。
接口实现时,直接返回 StreamingResponseBody 对象。在其 writeTo 方法的实现中,将数据循环写入 OutputStream 并适时调用 flush() 即可。每次调用 flush() 都会促使数据向客户端发送一次。
@GetMapping("/streamingResponse")
public ResponseEntity<StreamingResponseBody> handleRbe() {
StreamingResponseBody stream = out -> {
String message = "streamingResponse";
for (int i = 0; i < 1000; i++) {
try {
out.write(((message + i) + "\r\n").getBytes());
out.write("\r\n".getBytes());
//调用一次flush就会向前端写入一次数据
out.flush();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
return ResponseEntity.ok().contentType(MediaType.TEXT_HTML).body(stream);
}
上面的示例输出的是文本流,若将其中的输出逻辑替换为从文件读取字节流,即可实现大文件的高效下载。

图7:使用StreamingResponseBody接口实现的流式文本输出效果
总结
本文介绍了 Spring 框架中三种用于实现异步流式接口的核心工具:ResponseBodyEmitter、SseEmitter 和 StreamingResponseBody。它们的使用方式简洁明了,但各自针对不同的应用场景:ResponseBodyEmitter 适用于通用分块响应,SseEmitter 专攻服务器推送事件,而 StreamingResponseBody 则擅长处理大容量数据流。合理运用这些工具,能够显著提升应用在处理耗时、实时或大数据量任务时的用户体验和系统性能。
文中 Demo 已开源至 GitHub:https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot101/通用功能/springboot-streaming
希望这篇关于 Spring 异步流式响应的介绍能对你有所帮助。更多技术实践与深度讨论,欢迎访问云栈社区。