找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1821

积分

0

好友

255

主题
发表于 2025-12-31 02:31:32 | 查看: 23| 回复: 0

在处理耗时较长的接口时,直接采用异步接口是一种常见的解决方案,例如使用 CallableWebAsyncTaskDeferredResult。然而,这些方法通常只能返回单个处理结果。当业务场景要求接口在处理过程中,能够持续、渐进地向客户端反馈中间状态或结果时,这些传统方式就显得力不从心了。

Spring框架为此提供了专门的异步流式响应工具,主要包括 ResponseBodyEmitterSseEmitterStreamingResponseBody。它们的用法相当直观,在接口中直接返回相应的对象或包裹在 ResponseEntity<T> 中即可。这样一来,接口将变为异步非阻塞的,即使执行耗时操作也不会占用 Servlet 容器的请求线程,从而保障了系统的整体响应能力。

下面我们将逐一探讨这三种工具的具体应用及其适用场景。

ResponseBodyEmitter

ResponseBodyEmitter 适用于需要动态生成内容并分块发送给客户端的场景,例如实时展示文件上传进度、推送日志流或模拟 AI 对话的逐字输出效果。它允许开发者在后台任务执行过程中,多次调用 send 方法向客户端逐步推送数据。

想象一下类似 ChatGPT 的交互体验:提问后,答案并非一次性完整呈现,而是逐字逐句地动态生成。这种做法能显著提升交互的生动感和实时性。ResponseBodyEmitter 就可以轻松实现类似效果。

模拟流式对话交互示意图
图1:类似大语言模型的流式对话交互示意图

以下是一个简单的实现示例,它创建一个 ResponseBodyEmitter 对象,并在一个异步任务中模拟耗时操作,每隔2秒向客户端发送一次当前进度信息。

注意ResponseBodyEmitter 的超时时间可通过构造函数设置。若设为 0-1,则表示连接永不过时;若不设置,则会使用默认超时时间,连接超时后将自动断开。此规则同样适用于后文介绍的另外两种工具。

@GetMapping("/bodyEmitter")
public ResponseBodyEmitter handle() {
    // 创建一个ResponseBodyEmitter,-1代表不超时
    ResponseBodyEmitter emitter = new ResponseBodyEmitter(-1L);
    // 异步执行耗时操作
    CompletableFuture.runAsync(() -> {
        try {
            // 模拟耗时操作
            for (int i = 0; i < 10000; i++) {
                System.out.println("bodyEmitter " + i);
                // 发送数据
                emitter.send("bodyEmitter " + i + " @ " + new Date() + "\n");
                Thread.sleep(2000);
            }
            // 完成
            emitter.complete();
        } catch (Exception e) {
            // 发生异常时结束接口
            emitter.completeWithError(e);
        }
    });
    return emitter;
}

代码逻辑清晰明了。通过模拟每2秒发送一次数据,当请求该接口时,客户端页面便能观察到数据在持续、动态地追加显示,效果与上述的流式对话体验类似。

ResponseBodyEmitter接口流式输出日志
图2:ResponseBodyEmitter接口的流式输出日志效果

SseEmitter

SseEmitterResponseBodyEmitter 的一个子类,专为 服务器向客户端 单向推送实时事件流而设计。它基于 Server-Sent Events (SSE) 协议,非常适合实现实时消息通知、股票报价、体育赛事比分更新等场景。

Server-Sent Events (SSE) 工作原理图
图3:SSE技术实现服务器向客户端页面推送实时更新

SSE 的本质是在客户端与服务器之间建立一条单向的、长时的 HTTP 连接。服务器响应的 Content-Typetext/event-stream,从而可以持续不断地将事件流推送给客户端。这个过程类似于在线视频播放,数据流是连续传输的。

SSE通信流程示意图
图4:SSE通信中客户端与服务器的事件流交互流程

客户端通过 JavaScript 的 EventSource API 发起连接,之后便进入监听状态,等待接收服务器发来的事件。

<body>
    <div id="content" style="text-align: center;">
        <h1>SSE 接收服务端事件消息数据</h1>
        <div id="message">等待连接...</div>
    </div>
    <script>
        let source = null;
        let userId = 7777

        function setMessageInnerHTML(message) {
            const messageDiv = document.getElementById("message");
            const newParagraph = document.createElement("p");
            newParagraph.textContent = message;
            messageDiv.appendChild(newParagraph);
        }

        if (window.EventSource) {
            // 建立连接
            source = new EventSource('http://127.0.0.1:9033/subSseEmitter/'+userId);
            setMessageInnerHTML("连接用户=" + userId);
            /**
             * 连接一旦建立,就会触发open事件
             * 另一种写法:source.onopen = function (event) {}
             */
            source.addEventListener('open', function (e) {
                setMessageInnerHTML("建立连接。。。");
            }, false);
            /**
             * 客户端收到服务器发来的数据
             * 另一种写法:source.onmessage = function (event) {}
             */
            source.addEventListener('message', function (e) {
                setMessageInnerHTML(e.data);
            });
        } else {
            setMessageInnerHTML("你的浏览器不支持SSE");
        }
    </script>
</body>

服务端需要维护一个 SseEmitter 对象池(例如使用 Map),将连接与用户标识关联起来。当有消息需要推送给特定用户时,便可从池中取出对应的 SseEmitter 并调用其 send 方法。

private static final Map<String, SseEmitter> EMITTER_MAP = new ConcurrentHashMap<>();

@GetMapping("/subSseEmitter/{userId}")
public SseEmitter sseEmitter(@PathVariable String userId) {
    log.info("sseEmitter: {}", userId);
    SseEmitter emitterTmp = new SseEmitter(-1L);
    EMITTER_MAP.put(userId, emitterTmp);
    CompletableFuture.runAsync(() -> {
        try {
            SseEmitter.SseEventBuilder event = SseEmitter.event()
                    .data("sseEmitter" + userId + " @ " + LocalTime.now())
                    .id(String.valueOf(userId))
                    .name("sseEmitter");
            emitterTmp.send(event);
        } catch (Exception ex) {
            emitterTmp.completeWithError(ex);
        }
    });
    return emitterTmp;
}

@GetMapping("/sendSseMsg/{userId}")
public void sseEmitter(@PathVariable String userId, String msg) throws IOException {
    SseEmitter sseEmitter = EMITTER_MAP.get(userId);
    if (sseEmitter == null) {
        return;
    }
    sseEmitter.send(msg);
}

此后,通过调用类似 /sendSseMsg/7777?msg=欢迎关注 的接口,即可向 userId 为 7777 的用户实时推送消息,并在其页面上即时展示。

SSE客户端接收消息演示
图5:SSE客户端页面实时接收并展示服务端推送的消息

SSE 还有一个重要优势:连接建立后,如果因为网络波动或服务重启导致连接中断,客户端浏览器会自动尝试重新建立连接,保证了通信的鲁棒性。

服务端SSE连接初始化日志
图6:服务端日志显示SSE连接初始化和对应的控制器信息

StreamingResponseBody

StreamingResponseBody 的设计初衷略有不同,它主要面向大数据量或持续数据流的直接传输。其核心是提供一个 OutputStream,允许开发者将数据直接写入此流中,非常适合处理大文件下载、视频流传输等场景,能够有效避免一次性加载全部数据到内存所导致的内存溢出(OOM)风险。

例如,在下载一个超大文件时,使用 StreamingResponseBody 可以循环读取文件块,并持续写入输出流,实现“边读边传”,对服务器内存非常友好。

接口实现时,直接返回 StreamingResponseBody 对象。在其 writeTo 方法的实现中,将数据循环写入 OutputStream 并适时调用 flush() 即可。每次调用 flush() 都会促使数据向客户端发送一次。

@GetMapping("/streamingResponse")
public ResponseEntity<StreamingResponseBody> handleRbe() {

    StreamingResponseBody stream = out -> {
        String message = "streamingResponse";
        for (int i = 0; i < 1000; i++) {
            try {
                out.write(((message + i) + "\r\n").getBytes());
                out.write("\r\n".getBytes());
                //调用一次flush就会向前端写入一次数据
                out.flush();
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    return ResponseEntity.ok().contentType(MediaType.TEXT_HTML).body(stream);
}

上面的示例输出的是文本流,若将其中的输出逻辑替换为从文件读取字节流,即可实现大文件的高效下载。

StreamingResponseBody流式输出效果
图7:使用StreamingResponseBody接口实现的流式文本输出效果

总结

本文介绍了 Spring 框架中三种用于实现异步流式接口的核心工具:ResponseBodyEmitterSseEmitterStreamingResponseBody。它们的使用方式简洁明了,但各自针对不同的应用场景:ResponseBodyEmitter 适用于通用分块响应,SseEmitter 专攻服务器推送事件,而 StreamingResponseBody 则擅长处理大容量数据流。合理运用这些工具,能够显著提升应用在处理耗时、实时或大数据量任务时的用户体验和系统性能。

文中 Demo 已开源至 GitHub:https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot101/通用功能/springboot-streaming

希望这篇关于 Spring 异步流式响应的介绍能对你有所帮助。更多技术实践与深度讨论,欢迎访问云栈社区。




上一篇:Manticore Search:号称取代Elasticsearch的高性能搜索引擎实测
下一篇:代码文档生成工具对比:Qoder Repo Wiki、Code Wiki、Zread与DeepWiki-Open深度测评
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 09:07 , Processed in 0.343105 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表