1. 什么是 SSE?
Server-Sent Events (SSE) 是一种基于 HTTP 协议的实时数据推送技术。与 WebSocket 不同,SSE 仅支持服务器到客户端的单向通信,允许服务器主动向客户端发送数据,而无需客户端频繁发起请求。
核心特点:
- 基于 HTTP 协议,配置简单
- 单向通信(服务器到客户端),支持复杂消息流
- 内置自动重连机制,连接中断后可自动恢复
- 轻量级实现,适合实时更新场景
- 纯文本数据格式,便于调试和查看
2. SSE vs WebSocket
下表对比了 SSE 和 WebSocket 的主要特性:
| 特性 |
SSE |
WebSocket |
| 通信方向 |
单向(服务器到客户端) |
双向(服务器与客户端互通) |
| 协议 |
HTTP |
WebSocket (ws/wss) |
| 复杂度 |
简单易用 |
相对复杂,需管理握手流程 |
| 自动重连 |
内置支持 |
需自行实现 |
| 数据格式 |
纯文本(JSON、文本等) |
二进制和文本 |
如何选择?
- 若只需服务器向客户端推送实时数据,SSE 是轻量高效的选择
- 如需双向通信(如聊天室、协作编辑),WebSocket 更为合适
3. SSE 的应用场景
SSE 专为实时数据推送场景设计,典型应用包括:
- 大语言模型的流式输出:在 人工智能 应用中实时生成文本内容
- 实时通知提醒:后台任务进度推送、系统消息通知
- 实时日志显示:服务器日志更新、调试信息推送
- 金融数据更新:股票价格实时变动推送
- 社交媒体信息流:用户动态实时更新
4. 代码实现
以下通过 Node.js 后端和 React 前端演示 SSE 实现。
4.1 后端实现
const http = require("http");
// 模拟大模型的响应内容
const mockResponses = [
"你好!我是AI助手,",
"我正在处理你的请求。",
"\n\n",
"这是一个流式输出的演示,",
"我会每隔一段时间发送消息。",
"现在演示即将结束。",
"\n\n",
"再见!",
];
// 创建 HTTP 服务器
const server = http.createServer((req, res) => {
// 设置响应头
res.setHeader("Access-Control-Allow-Origin", "*");
// SSE 接口
if (req.url === "/stream") {
// 设置 SSE 相关的响应头
res.writeHead(200, {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
});
let messageIndex = 0;
// 发送初始化消息
res.write(`: This is comment\n`); // 这是一个注释
res.write(`retry: 3000\n`); // 如果断开,3秒后重连
// 定义发送消息的函数
const sendMessage = () => {
if (messageIndex < mockResponses.length) {
// 发送消息
res.write(`id: ${messageIndex + 1}\n`); // 消息ID
res.write(`event: message\n`); // 事件类型
res.write(
`data: ${JSON.stringify({
content: mockResponses[messageIndex],
})}\n\n`
);
messageIndex++;
// 随机延迟 500ms-1000ms
const delay = Math.floor(Math.random() * 501) + 500;
setTimeout(sendMessage, delay);
} else {
// 发送结束消息
res.write(`id: final\n`);
res.write(`event: complete\n`);
res.write(`data: "stream completed"\n\n`);
res.end();
}
};
// 开始发送消息
sendMessage();
// 监听客户端断开连接
req.on("close", () => {
console.log("客户端断开连接");
});
}
});
// 服务器监听 3001 端口
const PORT = 3001;
server.listen(PORT, () => {
console.log(`服务器正在运行,端口: ${PORT}`);
});
服务端实现要点
-
设置正确的响应头:
Content-Type: text/event-stream:指定数据流格式
Cache-Control: no-cache:避免缓存
Connection: keep-alive:保持连接活跃
-
消息格式规范:
- 以
: 开头的行作为注释
retry: 指定自动重连时间间隔(毫秒)
id: 指定消息 ID,用于区分消息
event: 指定消息类型,默认为 message
data: 指定消息内容,必须以 JSON 格式传递,结尾为 \n\n
-
字段顺序通常为:
id: 消息ID
event: 事件类型
data: 消息内容
4.2 前端实现
import { useState, useRef, useEffect } from "react";
import "./App.css";
function App() {
const [messages, setMessages] = useState([]);
const [isStreaming, setIsStreaming] = useState(false);
const [connectionStatus, setConnectionStatus] = useState("未连接");
const eventSourceRef = useRef(null);
const startSSEStream = async () => {
setIsStreaming(true);
setMessages([]);
const connect = () => {
setConnectionStatus("正在连接");
const eventSource = new EventSource("http://localhost:3001/stream");
eventSourceRef.current = eventSource;
eventSource.onopen = () => {
setConnectionStatus("已连接");
};
// 监听完成事件
eventSource.addEventListener("complete", (event) => {
eventSource.close();
handleStop();
setConnectionStatus("已完成");
});
eventSource.onmessage = (event) => {
setMessages((prev) => [
...prev,
JSON.parse(event.data).content,
]);
};
eventSource.onerror = (error) => {
console.error("SSE: 连接错误", error);
setConnectionStatus("连接断开,等待自动重连");
};
};
connect();
};
const handleStart = () => {
startSSEStream();
};
const handleStop = () => {
setIsStreaming(false);
setConnectionStatus("未连接");
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
};
useEffect(() => {
return () => {
handleStop();
};
}, []);
return (
<div className="container">
<div className="controls">
<button onClick={handleStart} disabled={isStreaming}>
{isStreaming ? "正在接收数据..." : "开始流式输出"}
</button>
<button onClick={handleStop} disabled={!isStreaming}>
停止
</button>
<span style={{ marginLeft: "10px" }}>
状态: {connectionStatus}
</span>
</div>
<div
className="message-container"
style={{
color: "#000",
textAlign: "left",
whiteSpace: "pre-line",
}}
>
{messages.join("")}
</div>
</div>
);
}
export default App;
前端实现要点:
-
事件处理:需要处理三个关键事件
onmessage:接收消息
onerror:处理错误
onopen:连接建立
其他事件使用 addEventListener 监听
-
资源管理:在请求完成或组件卸载时,调用 eventSource.close() 关闭连接,避免内存泄漏
4.3 重连续传的实现
SSE 的自动重连基于 EventSource 内置机制,连接错误断开时浏览器会自动重新发起请求。与 WebSocket 不同,此过程无需手动干预。
但重连会重新发起请求,数据从头开始。为实现续传功能,需前后端配合处理。核心是利用 last-event-id 请求头,该字段自动关联服务端指定的 id:,对应上一次请求的最后消息 ID。
后端实现
// ···
const server = http.createServer((req, res) => {
// ···
// 获取 Last-Event-ID
const lastEventId = req.headers["last-event-id"];
// 确定开始发送的消息索引
let messageIndex = lastEventId ? parseInt(lastEventId, 10) : 0;
// 如果 lastEventId 无效或超出范围,从头开始
if (isNaN(messageIndex) ||
messageIndex < 0 ||
messageIndex >= mockResponses.length
) {
messageIndex = 0;
}
// ···
});
// ···
5. EventSource 的缺点
EventSource API 存在以下限制,仅支持 url 和 withCredentials 参数:
- 无法传递请求体,所有参数必须编码在 URL 中,受浏览器 URL 长度限制(约 2000 字符)
- 无法自定义请求头
- 仅支持 GET 请求
- 自动重连机制无法手动控制
6. 使用 fetch 模拟 EventSource
为解决上述问题,可使用 fetch API 模拟 EventSource 功能,两者均基于 HTTP 协议。
/**
* FetchEventSource 类用于模拟原生 EventSource 的功能
* 由于原生 EventSource 存在一些限制(如不能自定义请求头、不支持POST等),
* 这里使用 fetch API 来实现相同的功能
*/
class FetchEventSource {
/**
* @param {string} url - SSE服务器端点URL
* @param {object} options - fetch请求的配置选项
*/
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.isActive = false;
this.lastEventId = null; // 用于断线重连时的消息追踪
this.eventListeners = new Map(); // 存储不同类型的事件监听器
this.start();
}
/**
* 启动SSE连接的核心方法
* 实现了:
* 1. 自动重连机制
* 2. 断点续传(通过lastEventId)
* 3. 数据流的解析
*/
async start() {
if (this.isActive) return;
this.isActive = true;
// 支持断点续传:如果存在lastEventId,添加到查询参数中
const params = this.lastEventId
? { lastEventId: this.lastEventId }
: {};
const queryString = new URLSearchParams(params).toString();
const requestUrl = queryString
? `${this.url}?${queryString}`
: this.url;
// 触发 open 事件,通知连接已建立
if (this.eventListeners.has("open")) {
this.eventListeners.get("open").forEach((listener) => listener());
}
try {
// 外层循环:处理重连逻辑
while (this.isActive) {
const response = await fetch(requestUrl, this.options);
// 核心部分:使用 ReadableStream 处理数据流
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
// 内层循环:处理数据流的读取
while (this.isActive) {
const { value, done } = await reader.read();
if (done) break;
// 将二进制数据解码为文本,并处理粘包问题
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // 保留最后一个不完整的行
// 解析每一行数据
for (const line of lines) {
this.parseEvent(line.trim());
}
}
}
} catch (error) {
console.error("FetchEventSource error:", error);
this.close();
}
}
/**
* 解析SSE事件数据
* 支持标准SSE字段:
* - id: 消息ID
* - event: 事件类型
* - data: 消息数据
*/
parseEvent(line) {
if (!line || line.startsWith(":")) return; // 忽略空行和注释行
const [key, ...rest] = line.split(":");
const value = rest.join(":").trim();
if (key === "id") {
this.lastEventId = value;
} else if (key === "event") {
this.currentEvent = value;
} else if (key === "data") {
const event = {
id: this.lastEventId,
event: this.currentEvent || "message",
data: value,
};
this.dispatchEvent(event);
}
}
/**
* 事件分发处理
* 支持两种监听方式:
* 1. addEventListener方式
* 2. onmessage回调方式
*/
dispatchEvent(event) {
const eventType = event.event;
// 触发特定事件监听器
if (this.eventListeners.has(eventType)) {
this.eventListeners
.get(eventType)
.forEach((listener) => listener(event));
}
// 支持传统的onmessage回调
if (eventType === "message" && typeof this.onmessage === "function") {
this.onmessage(event);
}
}
/**
* 添加事件监听器
* 支持监听自定义事件类型
*/
addEventListener(eventType, listener) {
if (!this.eventListeners.has(eventType)) {
this.eventListeners.set(eventType, []);
}
this.eventListeners.get(eventType).push(listener);
}
/**
* 关闭SSE连接
*/
close() {
this.isActive = false;
}
}
export default FetchEventSource;
这段代码实现了 EventSource 的基本功能,核心部分如下:
const response = await fetch(requestUrl, this.options);
// 使用 ReadableStream 处理数据流
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";
// 内层循环:处理数据流的读取
while (this.isActive) {
const { value, done } = await reader.read();
if (done) break;
// 将二进制数据解码为文本,并处理粘包问题
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // 保留最后一个不完整的行
// 解析每一行数据
for (const line of lines) {
this.parseEvent(line.trim());
}
}
后端返回的是持续输入的 ReadableStream,前端使用 getReader 和 TextDecoder 将流数据解码为 UTF-8 文本。根据 SSE 标准格式,使用 split('\n') 分段数据,解析为事件并触发相应监听器。
实际应用中推荐使用 @microsoft/fetch-event-source 库,它提供完整实现,自动处理断线重连,支持自定义请求头,原理与示例代码相似。
7. 总结
SSE 是一种轻量级、易用且高效的实时数据流技术,特别适合需要服务器推送数据的应用场景。无论是实时消息推送还是日志展示,SSE 都能提供良好的解决方案。掌握 SSE 技术有助于在前端开发中更好地处理实时数据需求。