找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

150

积分

0

好友

18

主题
发表于 前天 02:02 | 查看: 5| 回复: 0

1. 什么是 SSE?

Server-Sent Events (SSE) 是一种基于 HTTP 协议的实时数据推送技术。与 WebSocket 不同,SSE 仅支持服务器到客户端的单向通信,允许服务器主动向客户端发送数据,而无需客户端频繁发起请求。

核心特点:

  • 基于 HTTP 协议,配置简单
  • 单向通信(服务器到客户端),支持复杂消息流
  • 内置自动重连机制,连接中断后可自动恢复
  • 轻量级实现,适合实时更新场景
  • 纯文本数据格式,便于调试和查看

2. SSE vs WebSocket

下表对比了 SSE 和 WebSocket 的主要特性:

特性 SSE WebSocket
通信方向 单向(服务器到客户端) 双向(服务器与客户端互通)
协议 HTTP WebSocket (ws/wss)
复杂度 简单易用 相对复杂,需管理握手流程
自动重连 内置支持 需自行实现
数据格式 纯文本(JSON、文本等) 二进制和文本

如何选择?

  • 若只需服务器向客户端推送实时数据,SSE 是轻量高效的选择
  • 如需双向通信(如聊天室、协作编辑),WebSocket 更为合适

3. SSE 的应用场景

SSE 专为实时数据推送场景设计,典型应用包括:

  • 大语言模型的流式输出:在 人工智能 应用中实时生成文本内容
  • 实时通知提醒:后台任务进度推送、系统消息通知
  • 实时日志显示:服务器日志更新、调试信息推送
  • 金融数据更新:股票价格实时变动推送
  • 社交媒体信息流:用户动态实时更新

4. 代码实现

以下通过 Node.js 后端和 React 前端演示 SSE 实现。

4.1 后端实现

const http = require("http");

// 模拟大模型的响应内容
const mockResponses = [
  "你好!我是AI助手,",
  "我正在处理你的请求。",
  "\n\n",
  "这是一个流式输出的演示,",
  "我会每隔一段时间发送消息。",
  "现在演示即将结束。",
  "\n\n",
  "再见!",
];

// 创建 HTTP 服务器
const server = http.createServer((req, res) => {
  // 设置响应头
  res.setHeader("Access-Control-Allow-Origin", "*");

  // SSE 接口
  if (req.url === "/stream") {
    // 设置 SSE 相关的响应头
    res.writeHead(200, {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    });

    let messageIndex = 0;

    // 发送初始化消息
    res.write(`: This is comment\n`);  // 这是一个注释
    res.write(`retry: 3000\n`);        // 如果断开,3秒后重连

    // 定义发送消息的函数
    const sendMessage = () => {
      if (messageIndex < mockResponses.length) {
        // 发送消息
        res.write(`id: ${messageIndex + 1}\n`);      // 消息ID
        res.write(`event: message\n`);               // 事件类型
        res.write(
          `data: ${JSON.stringify({
            content: mockResponses[messageIndex],
          })}\n\n`
        );
        messageIndex++;

        // 随机延迟 500ms-1000ms
        const delay = Math.floor(Math.random() * 501) + 500;
        setTimeout(sendMessage, delay);
      } else {
        // 发送结束消息
        res.write(`id: final\n`);
        res.write(`event: complete\n`);
        res.write(`data: "stream completed"\n\n`);
        res.end();
      }
    };

    // 开始发送消息
    sendMessage();

    // 监听客户端断开连接
    req.on("close", () => {
      console.log("客户端断开连接");
    });
  }
});

// 服务器监听 3001 端口
const PORT = 3001;
server.listen(PORT, () => {
  console.log(`服务器正在运行,端口: ${PORT}`);
});
服务端实现要点
  1. 设置正确的响应头:

    • Content-Type: text/event-stream:指定数据流格式
    • Cache-Control: no-cache:避免缓存
    • Connection: keep-alive:保持连接活跃
  2. 消息格式规范:

    • : 开头的行作为注释
    • retry: 指定自动重连时间间隔(毫秒)
    • id: 指定消息 ID,用于区分消息
    • event: 指定消息类型,默认为 message
    • data: 指定消息内容,必须以 JSON 格式传递,结尾为 \n\n
  3. 字段顺序通常为:

    id: 消息ID
    event: 事件类型  
    data: 消息内容

4.2 前端实现

import { useState, useRef, useEffect } from "react";
import "./App.css";

function App() {
  const [messages, setMessages] = useState([]);
  const [isStreaming, setIsStreaming] = useState(false);
  const [connectionStatus, setConnectionStatus] = useState("未连接");
  const eventSourceRef = useRef(null);

  const startSSEStream = async () => {
    setIsStreaming(true);
    setMessages([]);

    const connect = () => {
      setConnectionStatus("正在连接");
      const eventSource = new EventSource("http://localhost:3001/stream");
      eventSourceRef.current = eventSource;

      eventSource.onopen = () => {
        setConnectionStatus("已连接");
      };

      // 监听完成事件
      eventSource.addEventListener("complete", (event) => {
        eventSource.close();
        handleStop();
        setConnectionStatus("已完成");
      });

      eventSource.onmessage = (event) => {
        setMessages((prev) => [
          ...prev,
          JSON.parse(event.data).content,
        ]);
      };

      eventSource.onerror = (error) => {
        console.error("SSE: 连接错误", error);
        setConnectionStatus("连接断开,等待自动重连");
      };
    };

    connect();
  };

  const handleStart = () => {
    startSSEStream();
  };

  const handleStop = () => {
    setIsStreaming(false);
    setConnectionStatus("未连接");
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      eventSourceRef.current = null;
    }
  };

  useEffect(() => {
    return () => {
      handleStop();
    };
  }, []);

  return (
    <div className="container">
      <div className="controls">
        <button onClick={handleStart} disabled={isStreaming}>
          {isStreaming ? "正在接收数据..." : "开始流式输出"}
        </button>
        <button onClick={handleStop} disabled={!isStreaming}>
          停止
        </button>
        <span style={{ marginLeft: "10px" }}>
          状态: {connectionStatus}
        </span>
      </div>
      <div
        className="message-container"
        style={{
          color: "#000",
          textAlign: "left",
          whiteSpace: "pre-line",
        }}
      >
        {messages.join("")}
      </div>
    </div>
  );
}

export default App;
前端实现要点:
  1. 事件处理:需要处理三个关键事件

    • onmessage:接收消息
    • onerror:处理错误
    • onopen:连接建立 其他事件使用 addEventListener 监听
  2. 资源管理:在请求完成或组件卸载时,调用 eventSource.close() 关闭连接,避免内存泄漏

4.3 重连续传的实现

SSE 的自动重连基于 EventSource 内置机制,连接错误断开时浏览器会自动重新发起请求。与 WebSocket 不同,此过程无需手动干预。

但重连会重新发起请求,数据从头开始。为实现续传功能,需前后端配合处理。核心是利用 last-event-id 请求头,该字段自动关联服务端指定的 id:,对应上一次请求的最后消息 ID。

后端实现
// ···
const server = http.createServer((req, res) => {
  // ···
  // 获取 Last-Event-ID
  const lastEventId = req.headers["last-event-id"];

  // 确定开始发送的消息索引
  let messageIndex = lastEventId ? parseInt(lastEventId, 10) : 0;

  // 如果 lastEventId 无效或超出范围,从头开始
  if (isNaN(messageIndex) || 
      messageIndex < 0 || 
      messageIndex >= mockResponses.length
  ) {
    messageIndex = 0;
  }
  // ···
});
// ···

5. EventSource 的缺点

EventSource API 存在以下限制,仅支持 url 和 withCredentials 参数:

  1. 无法传递请求体,所有参数必须编码在 URL 中,受浏览器 URL 长度限制(约 2000 字符)
  2. 无法自定义请求头
  3. 仅支持 GET 请求
  4. 自动重连机制无法手动控制

6. 使用 fetch 模拟 EventSource

为解决上述问题,可使用 fetch API 模拟 EventSource 功能,两者均基于 HTTP 协议。

/**
 * FetchEventSource 类用于模拟原生 EventSource 的功能
 * 由于原生 EventSource 存在一些限制(如不能自定义请求头、不支持POST等),
 * 这里使用 fetch API 来实现相同的功能
 */
class FetchEventSource {
  /**
   * @param {string} url - SSE服务器端点URL
   * @param {object} options - fetch请求的配置选项
   */
  constructor(url, options = {}) {
    this.url = url;
    this.options = options;
    this.isActive = false;
    this.lastEventId = null;           // 用于断线重连时的消息追踪
    this.eventListeners = new Map();   // 存储不同类型的事件监听器
    this.start();
  }

  /**
   * 启动SSE连接的核心方法
   * 实现了:
   * 1. 自动重连机制
   * 2. 断点续传(通过lastEventId)
   * 3. 数据流的解析
   */
  async start() {
    if (this.isActive) return;
    this.isActive = true;

    // 支持断点续传:如果存在lastEventId,添加到查询参数中
    const params = this.lastEventId
      ? { lastEventId: this.lastEventId }
      : {};
    const queryString = new URLSearchParams(params).toString();
    const requestUrl = queryString
      ? `${this.url}?${queryString}`
      : this.url;

    // 触发 open 事件,通知连接已建立
    if (this.eventListeners.has("open")) {
      this.eventListeners.get("open").forEach((listener) => listener());
    }

    try {
      // 外层循环:处理重连逻辑
      while (this.isActive) {
        const response = await fetch(requestUrl, this.options);

        // 核心部分:使用 ReadableStream 处理数据流
        const reader = response.body.getReader();
        const decoder = new TextDecoder("utf-8");
        let buffer = "";

        // 内层循环:处理数据流的读取
        while (this.isActive) {
          const { value, done } = await reader.read();
          if (done) break;

          // 将二进制数据解码为文本,并处理粘包问题
          buffer += decoder.decode(value, { stream: true });
          const lines = buffer.split("\n");
          buffer = lines.pop() || "";  // 保留最后一个不完整的行

          // 解析每一行数据
          for (const line of lines) {
            this.parseEvent(line.trim());
          }
        }
      }
    } catch (error) {
      console.error("FetchEventSource error:", error);
      this.close();
    }
  }

  /**
   * 解析SSE事件数据
   * 支持标准SSE字段:
   * - id: 消息ID
   * - event: 事件类型
   * - data: 消息数据
   */
  parseEvent(line) {
    if (!line || line.startsWith(":")) return;  // 忽略空行和注释行
    const [key, ...rest] = line.split(":");
    const value = rest.join(":").trim();

    if (key === "id") {
      this.lastEventId = value;
    } else if (key === "event") {
      this.currentEvent = value;
    } else if (key === "data") {
      const event = {
        id: this.lastEventId,
        event: this.currentEvent || "message",
        data: value,
      };
      this.dispatchEvent(event);
    }
  }

  /**
   * 事件分发处理
   * 支持两种监听方式:
   * 1. addEventListener方式
   * 2. onmessage回调方式
   */
  dispatchEvent(event) {
    const eventType = event.event;

    // 触发特定事件监听器
    if (this.eventListeners.has(eventType)) {
      this.eventListeners
        .get(eventType)
        .forEach((listener) => listener(event));
    }

    // 支持传统的onmessage回调
    if (eventType === "message" && typeof this.onmessage === "function") {
      this.onmessage(event);
    }
  }

  /**
   * 添加事件监听器
   * 支持监听自定义事件类型
   */
  addEventListener(eventType, listener) {
    if (!this.eventListeners.has(eventType)) {
      this.eventListeners.set(eventType, []);
    }
    this.eventListeners.get(eventType).push(listener);
  }

  /**
   * 关闭SSE连接
   */
  close() {
    this.isActive = false;
  }
}

export default FetchEventSource;

这段代码实现了 EventSource 的基本功能,核心部分如下:

const response = await fetch(requestUrl, this.options);
// 使用 ReadableStream 处理数据流
const reader = response.body.getReader();
const decoder = new TextDecoder("utf-8");
let buffer = "";

// 内层循环:处理数据流的读取
while (this.isActive) {
  const { value, done } = await reader.read();
  if (done) break;

  // 将二进制数据解码为文本,并处理粘包问题
  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split("\n");
  buffer = lines.pop() || "";  // 保留最后一个不完整的行

  // 解析每一行数据
  for (const line of lines) {
    this.parseEvent(line.trim());
  }
}

后端返回的是持续输入的 ReadableStream,前端使用 getReaderTextDecoder 将流数据解码为 UTF-8 文本。根据 SSE 标准格式,使用 split('\n') 分段数据,解析为事件并触发相应监听器。

实际应用中推荐使用 @microsoft/fetch-event-source 库,它提供完整实现,自动处理断线重连,支持自定义请求头,原理与示例代码相似。

7. 总结

SSE 是一种轻量级、易用且高效的实时数据流技术,特别适合需要服务器推送数据的应用场景。无论是实时消息推送还是日志展示,SSE 都能提供良好的解决方案。掌握 SSE 技术有助于在前端开发中更好地处理实时数据需求。

您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区(YunPan.Plus) ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-1 13:29 , Processed in 0.057582 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 CloudStack.

快速回复 返回顶部 返回列表