找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

373

积分

0

好友

49

主题
发表于 前天 00:18 | 查看: 6| 回复: 0

第一章:Server-Sent Events(SSE)基础概念与原理

1.1 什么是 Server-Sent Events?

Server-Sent Events(SSE)是一种基于 HTTP 的单向实时通信协议,允许服务器向客户端主动推送数据。与 WebSocket 不同,SSE 是单工通信(仅服务器 → 客户端),且基于标准 HTTP/1.1 或 HTTP/2,无需额外协议支持。

SSE 的核心特点:

  • 使用 text/event-stream MIME 类型
  • 自动重连机制(通过 retry 字段)
  • 支持事件 ID(用于断线重传)
  • 原生浏览器支持(EventSource API)
  • 轻量、简单、易于调试

1.2 SSE vs WebSocket vs 长轮询

特性 SSE WebSocket 长轮询
通信方向 单向(服务端→客户端) 双向 模拟双向(实际为请求-响应)
协议 HTTP 独立协议(ws/wss) HTTP
浏览器支持 广泛(除 IE) 广泛 全部
实现复杂度 中高
连接开销 低(复用 HTTP) 中(需握手) 高(频繁请求)
适用场景 实时通知、日志流、股票行情 聊天、游戏、协同编辑 兼容旧浏览器

结论:若只需服务器向客户端推送数据,SSE 是比 WebSocket 更轻量、更易维护的选择。

1.3 SSE 协议格式详解

SSE 数据以纯文本形式通过 HTTP 响应流发送,格式如下:

event: message
data: {"time":"2025-12-03T10:00:00Z","msg":"Hello SSE"}
id: 123
retry: 5000
  • data::必须字段,表示消息内容(可多行,以空行结束)
  • event::可选,指定事件类型(客户端可监听特定事件)
  • id::消息 ID,用于断线后重传(浏览器自动在 Last-Event-ID 请求头中携带)
  • retry::重连间隔(毫秒)

⚠️ 注意:每行以\n结尾,整个消息块以\n\n结束。

1.4 浏览器端使用 EventSource

const eventSource = new EventSource('/sse/notifications');

eventSource.onmessage = (e) => {
    console.log('默认事件:', e.data);
};

eventSource.addEventListener('alert', (e) => {
    console.log('自定义 alert 事件:', e.data);
});

eventSource.onerror = (err) => {
    console.error('SSE 连接错误:', err);
};

浏览器会自动处理:

  • 自动重连(失败后按 retry 值重试)
  • 断线后携带 Last-Event-ID
  • 自动解析 dataeventid 字段

1.5 ASP.NET Core 对 SSE 的原生支持

ASP.NET Core 没有内置 SSE 中间件,但可通过以下方式实现:

  • 手动设置 Content-Type: text/event-stream
  • 使用 HttpResponse.BodyWriterStreamWriter 写入流
  • 利用 CancellationToken 监听连接中断
  • 结合 IHostedServiceChannel<T> 实现消息广播

关键点:SSE 本质是长连接 HTTP 响应流,需保持连接打开并持续写入。

1.6 本章小结

  • SSE 是轻量级服务器推送技术,适用于单向实时场景
  • 协议简单,基于 HTTP,天然穿越防火墙
  • 浏览器原生支持 EventSource,自动处理重连与 ID
  • ASP.NET Core 需手动实现 SSE 响应逻辑

第二章:在 ASP.NET Core 10 中实现基础 SSE 接口

本章目标: ✅ 创建一个可工作的 SSE 端点 ✅ 理解 ASP.NET Core 中如何管理长连接流 ✅ 实现基本的消息推送(如时间戳、通知) ✅ 正确处理客户端断开与资源释放

2.1 创建 ASP.NET Core 10 项目

首先,确保你已安装 .NET 10 SDK。使用 CLI 创建 Web API 项目:

dotnet new webapi -n SseDemo
cd SseDemo

删除 WeatherForecastController.cs(示例文件),我们将从零构建 SSE 控制器。

2.2 编写基础 SSE 控制器

创建 Controllers/SseController.cs

using Microsoft.AspNetCore.Mvc;

namespace SseDemo.Controllers;

[ApiController]
[Route("api/[controller]")]
public class SseController : ControllerBase
{
    [HttpGet("time")]
    public async Task TimeStream(CancellationToken cancellationToken)
    {
        // 设置响应头
        Response.Headers.ContentType = "text/event-stream";
        Response.Headers.CacheControl = "no-cache";
        Response.Headers.Connection = "keep-alive";

        var writer = new StreamWriter(Response.Body, leaveOpen: true)
        {
            AutoFlush = true // 关键!确保数据立即发送
        };

        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var message = $"data: {{\"time\": \"{DateTime.UtcNow:O}\"}}\n\n";
                await writer.WriteAsync(message);
                // 每秒推送一次
                await Task.Delay(1000, cancellationToken);
            }
        }
        catch (OperationCanceledException)
        {
            // 客户端断开连接时会触发此异常(正常)
        }
        finally
        {
            await writer.DisposeAsync();
        }
    }
}
关键点解析:
  1. 返回类型为 Task:不使用 IActionResult,因为我们要直接操作 Response.Body
  2. 设置响应头
    • Content-Type: text/event-stream:告知浏览器这是 SSE 流
    • Cache-Control: no-cache:防止代理缓存
    • Connection: keep-alive:保持 TCP 连接
  3. StreamWriter + AutoFlush = true:若不启用自动刷新,数据可能滞留在缓冲区,导致客户端收不到实时消息。
  4. CancellationToken 监听中断:当用户关闭页面或网络断开,ASP.NET Core 会自动触发 cancellationToken,退出循环。
  5. 消息格式:使用标准 SSE 格式:data: {...}\n\n

2.3 测试 SSE 接口

方法一:浏览器控制台测试

在 HTML 页面中添加:

<script>
  const es = new EventSource('/api/sse/time');
  es.onmessage = e => console.log(JSON.parse(e.data));
</script>

打开开发者工具 → Console,将看到每秒输出 UTC 时间。

方法二:使用 curl(仅查看原始流)
curl -H "Accept: text/event-stream" http://localhost:5000/api/sse/time

注意:curl 不会自动重连,但可验证流是否正常输出。

2.4 增强:支持事件类型与 ID

修改推送逻辑,加入 eventid 字段:

var eventId = 0;
while (!cancellationToken.IsCancellationRequested)
{
    eventId++;
    var payload = new
    {
        time = DateTime.UtcNow,
        message = "Current server time"
    };
    var json = System.Text.Json.JsonSerializer.Serialize(payload);
    var sseMessage =
        $"event: tick\n" +
        $"id: {eventId}\n" +
        $"data: {json}\n\n";
    await writer.WriteAsync(sseMessage);
    await Task.Delay(2000, cancellationToken); // 每2秒
}

前端可监听特定事件:

const es = new EventSource('/api/sse/time');
es.addEventListener('tick', e => {
    console.log('Tick event:', JSON.parse(e.data));
});

💡 提示:Last-Event-ID 机制将在后续章节结合“消息恢复”功能详解。

2.5 处理客户端断开连接的正确方式

ASP.NET Core 在客户端断开时会:

  • 触发 HttpContext.RequestAbortedCancellationToken
  • 抛出 OperationCanceledException(若正在等待异步操作)

因此,必须用 try-catch 包裹循环,并在 finally 中释放资源。

⚠️ 常见错误1:忘记 AutoFlush = true → 数据卡在缓冲区 ⚠️ 常见错误2:未处理取消令牌 → 连接断开后后台仍在运行(内存泄漏)

2.6 使用 Minimal API 实现 SSE(可选)

如果你偏好 Minimal API,可这样写:

// Program.cs
app.MapGet("/sse/mini", async (HttpContext ctx) =>
{
    var response = ctx.Response;
    response.ContentType = "text/event-stream";
    response.Headers.CacheControl = "no-cache";

    var writer = new StreamWriter(response.Body, leaveOpen: true) { AutoFlush = true };
    var ct = ctx.RequestAborted;

    try
    {
        while (!ct.IsCancellationRequested)
        {
            await writer.WriteLineAsync($"data: {{\"count\":{DateTime.Now.Second}}}\n\n");
            await Task.Delay(1000, ct);
        }
    }
    finally
    {
        await writer.DisposeAsync();
    }
});

效果完全一致,代码更紧凑。

2.7 性能与资源考量

每个 SSE 连接会:

  • 占用一个线程(实际是异步,不阻塞线程池)
  • 保持一个 HTTP 连接(受 Kestrel 最大连接数限制)
  • 持有 StreamWriter 和内存缓冲区

📊 默认 Kestrel 最大连接数为无限制(但受系统资源约束)。生产环境需监控连接数。

建议

  • 避免在控制器中创建大对象
  • 使用对象池复用序列化器(如 JsonSerializer
  • 考虑使用 PipelinesIAsyncEnumerable 提升吞吐(见后续章节)

2.8 本章小结

  • 成功在 ASP.NET Core 10 中实现基础 SSE 推送
  • 掌握了 StreamWriter + AutoFlush 的关键组合
  • 理解了 CancellationToken 在连接管理中的作用
  • 学会了发送带 eventid 的结构化消息
  • 对比了 Controller 与 Minimal API 两种实现方式

第三章:构建可扩展的 SSE 消息广播系统

在第二章中,我们实现了单个客户端与服务器之间的简单 SSE 连接。但在真实场景中,一个事件(如“新订单”、“系统告警”)往往需要同时推送给成百上千个在线用户。这就要求我们构建一个可扩展、线程安全、资源高效的 SSE 广播系统。

本章目标: ✅ 使用 Channel<T> 实现高性能消息队列 ✅ 设计连接管理器(Connection Manager) ✅ 实现多客户端订阅/退订机制 ✅ 构建“实时通知中心”实战项目 ✅ 避免常见并发陷阱(如竞态条件、内存泄漏)

3.1 为什么需要消息广播架构?

直接在控制器中写死推送逻辑的问题:

  • 无法跨请求共享连接
  • 无法从其他服务(如后台任务、API 调用)触发推送
  • 每个连接独立运行,无法统一管理
  • 扩展性差,难以支持“房间”或“主题”订阅

理想架构应具备

  • 解耦:消息生产者 ≠ 消费者
  • 广播:一条消息 → 多个连接
  • 弹性:连接动态加入/离开
  • 可观测:可监控连接数、消息速率

3.2 引入 Channel:.NET 的高性能异步管道

System.Threading.Channels 提供了线程安全的生产者-消费者通道,非常适合 SSE 场景。

// 创建一个无界通道(也可用有界通道限流)
var channel = Channel.CreateUnbounded<string>();
  • 生产者:调用 channel.Writer.WriteAsync(message)
  • 消费者:通过 await channel.Reader.ReadAsync() 获取消息

优势

  • 零分配(zero-allocation)设计
  • 支持背压(有界通道)
  • 原生支持 IAsyncEnumerable
  • ConcurrentQueue + ManualResetEvent 更简洁高效

3.3 设计 SSE 连接管理器(SseConnectionManager)

创建 Services/SseConnectionManager.cs

using System.Threading.Channels;

namespace SseDemo.Services;

public class SseConnectionManager
{
    private readonly ConcurrentDictionary<string, ChannelWriter<string>> _writers = new();

    public IAsyncEnumerable<string> Subscribe(string connectionId, CancellationToken ct)
    {
        var channel = Channel.CreateUnbounded<string>();
        _writers.TryAdd(connectionId, channel.Writer);
        ct.Register(() => _writers.TryRemove(connectionId, out _));
        return channel.Reader.ReadAllAsync(ct);
    }

    public void Broadcast(string message)
    {
        foreach (var writer in _writers.Values)
        {
            // Fire-and-forget,不等待
            _ = writer.TryWrite(message);
        }
    }
    // 可选:按主题广播(见 3.6 节)
}

关键改进

  • 使用 ConcurrentDictionary 避免显式加锁
  • 每个连接拥有独立 ChannelWriter
  • 利用 CancellationToken.Register 自动清理
  • TryWrite 非阻塞,适合高吞吐

3.4 注册服务到 DI 容器

Program.cs 中:

builder.Services.AddSingleton<SseConnectionManager>();

💡 必须是 Singleton,确保所有请求共享同一个管理器。

3.5 修改控制器以支持广播

[ApiController]
[Route("api/[controller]")]
public class SseController : ControllerBase
{
    private readonly SseConnectionManager _sseManager;
    public SseController(SseConnectionManager sseManager)
    {
        _sseManager = sseManager;
    }

    [HttpGet("subscribe")]
    public async Task Subscribe(CancellationToken cancellationToken)
    {
        Response.Headers.ContentType = "text/event-stream";
        Response.Headers.CacheControl = "no-cache";

        var connectionId = Guid.NewGuid().ToString();
        var stream = _sseManager.Subscribe(connectionId, cancellationToken);
        var writer = new StreamWriter(Response.Body, leaveOpen: true) { AutoFlush = true };

        try
        {
            await foreach (var message in stream.WithCancellation(cancellationToken))
            {
                await writer.WriteLineAsync($"data: {message}\n\n");
            }
        }
        finally
        {
            await writer.DisposeAsync();
            // 自动由 CancellationToken 注册的回调清理
        }
    }

    [HttpPost("notify")]
    public IActionResult Notify([FromBody] object payload)
    {
        var json = JsonSerializer.Serialize(payload);
        _sseManager.Broadcast(json);
        return Ok("Notified");
    }
}

现在:

  • 客户端 GET /api/sse/subscribe 建立 SSE 连接
  • 任何服务 POST /api/sse/notify 即可向所有连接广播消息

3.6 实战:构建“订单状态通知中心”

假设我们有一个电商系统,需要实时通知用户订单状态变更。

步骤 1:定义消息模型
public record OrderNotification(
    string UserId,
    string OrderId,
    string Status,
    DateTime Timestamp);
步骤 2:增强 ConnectionManager 支持“按用户订阅”
public class UserSseConnectionManager
{
    private readonly ConcurrentDictionary<string, HashSet<ChannelWriter<string>>> _userConnections = new();

    public IAsyncEnumerable<string> Subscribe(string userId, CancellationToken ct)
    {
        var channel = Channel.CreateUnbounded<string>();
        var writer = channel.Writer;
        _userConnections.AddOrUpdate(userId,
            _ => new HashSet<ChannelWriter<string>> { writer },
            (_, set) => { set.Add(writer); return set; });

        ct.Register(() =>
        {
            var set = _userConnections.GetOrAdd(userId, _ => new());
            lock (set) set.Remove(writer);
        });

        return channel.Reader.ReadAllAsync(ct);
    }

    public void NotifyUser(string userId, string message)
    {
        if (_userConnections.TryGetValue(userId, out var writers))
        {
            var deadWriters = new List<ChannelWriter<string>>();
            foreach (var writer in writers)
            {
                if (!writer.TryWrite(message))
                    deadWriters.Add(writer);
            }
            if (deadWriters.Count > 0)
            {
                lock (writers)
                {
                    foreach (var w in deadWriters)
                        writers.Remove(w);
                }
            }
        }
    }
}
步骤 3:在订单服务中调用
// OrderService.cs
public class OrderService
{
    private readonly UserSseConnectionManager _sse;
    public OrderService(UserSseConnectionManager sse) => _sse = sse;

    public async Task UpdateOrderStatus(string orderId, string newStatus, string userId)
    {
        // ... 更新数据库
        var notification = new OrderNotification(userId, orderId, newStatus, DateTime.UtcNow);
        var json = JsonSerializer.Serialize(notification);
        _sse.NotifyUser(userId, json);
    }
}
步骤 4:前端监听
const userId = 'user123';
const es = new EventSource(`/api/sse/user/${userId}`);
es.onmessage = e => {
    const notif = JSON.parse(e.data);
    showNotification(`订单 ${notif.orderId} 状态更新为: ${notif.status}`);
};

3.7 性能优化建议

  1. 使用有界通道防止 OOM
    var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
    {
        FullMode = BoundedChannelFullMode.DropWrite
    });
  2. 批量写入:若消息频率极高,可聚合多条消息再发送(但牺牲实时性)。
  3. 压缩响应(HTTP Compression):在 Program.cs 启用 builder.Services.AddResponseCompression();app.UseResponseCompression();。注意:某些代理可能不支持流式压缩,需测试。
  4. 监控连接数:暴露指标供 Prometheus 抓取,例如 public int GetConnectionCount() => _writers.Count;

3.8 本章小结

  • 使用 Channel<T> 构建了高性能消息管道
  • 设计了线程安全的 SseConnectionManager
  • 实现了全局广播与用户级定向推送
  • 完成了“订单通知中心”实战案例
  • 掌握了资源清理、并发控制、性能优化技巧

第四章:SSE 中的断线重连与消息可靠性保障

在前几章中,我们实现了基本的 SSE 推送和广播系统。然而,在真实网络环境中,客户端可能因网络波动、页面刷新、设备休眠等原因频繁断开连接。若不加以处理,用户将丢失断线期间的重要消息(如订单支付成功、系统告警等)。

本章目标: ✅ 深入理解 SSE 的 Last-Event-ID 机制 ✅ 实现断线自动续传(消息回溯) ✅ 设计可靠的消息存储与投递策略 ✅ 构建“至少一次”语义的 SSE 服务 ✅ 结合数据库实现消息持久化与清理

4.1 SSE 的内置重连机制:Last-Event-ID

SSE 协议原生支持断线重连与消息恢复,核心是两个字段:

字段 作用
id:(服务器发送) 为每条消息分配唯一 ID
Last-Event-ID(客户端自动携带) 重连时在请求头中带上最后收到的 ID

浏览器行为:当连接断开,浏览器会自动发起新请求,并在 HTTP 头中包含:

Last-Event-ID: 123

我们的服务端必须:

  1. 在每条 SSE 消息中包含 id: <唯一ID>
  2. 从请求头读取 Last-Event-ID
  3. 从该 ID 之后开始重发未送达的消息

4.2 设计消息 ID 策略

消息 ID 必须满足:

  • 单调递增(便于比较)
  • 全局唯一(跨实例、跨时间)
  • 可排序(支持范围查询)

推荐方案

  • 时间戳 + 序列号(如 20251203120000_001
  • ULID / Snowflake ID(分布式友好)
  • 数据库自增主键(简单但依赖 DB)

✅ 本章采用 64 位整数 ID(如数据库主键),兼顾性能与排序。

4.3 消息存储模型设计

我们需要一个持久化消息日志表,用于存储所有已发送事件。这通常涉及到数据库/中间件如 SQL Server 或 PostgreSQL。

CREATE TABLE SseMessages (
    Id BIGINT IDENTITY(1,1) PRIMARY KEY,
    EventType NVARCHAR(100) NOT NULL,  -- 如 "order.update"
    TargetUserId NVARCHAR(128),        -- 可为空(表示全局广播)
    Payload NVARCHAR(MAX) NOT NULL,    -- JSON 内容
    CreatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
-- 索引优化查询
CREATE INDEX IX_SseMessages_TargetUserId_Id ON SseMessages (TargetUserId, Id);
CREATE INDEX IX_SseMessages_Id ON SseMessages (Id);

💡 若消息量极大(>1亿/天),可考虑分表、TTL 自动清理或使用专用时序数据库。

4.4 实现可靠 SSE 控制器(支持 Last-Event-ID)

步骤 1:创建消息仓储接口
public record SseMessage(long Id, string EventType, string? TargetUserId, string Payload);

public interface ISseMessageRepository
{
    Task<long> SaveAsync(string eventType, string? targetUserId, string payload);
    IAsyncEnumerable<SseMessage> GetSinceAsync(string? targetUserId, long lastId);
}
步骤 2:修改 SSE 控制器
[HttpGet("reliable/{userId?}")]
public async Task ReliableStream(string? userId, CancellationToken ct)
{
    Response.Headers.ContentType = "text/event-stream";
    Response.Headers.CacheControl = "no-cache";

    // 1. 读取 Last-Event-ID
    var lastIdStr = Request.Headers["Last-Event-ID"].FirstOrDefault();
    var lastId = long.TryParse(lastIdStr, out var id) ? id : 0L;

    // 2. 获取历史未送达消息
    var backlog = _messageRepo.GetSinceAsync(userId, lastId);
    var writer = new StreamWriter(Response.Body, leaveOpen: true) { AutoFlush = true };

    try
    {
        // 3. 先发送积压消息
        await foreach (var msg in backlog.WithCancellation(ct))
        {
            var sse = $"id: {msg.Id}\n" +
                      $"event: {msg.EventType}\n" +
                      $"data: {msg.Payload}\n\n";
            await writer.WriteAsync(sse);
        }

        // 4. 订阅实时新消息(通过 Channel 广播)
        var liveStream = _connectionManager.Subscribe(userId, ct);
        await foreach (var liveMsg in liveStream.WithCancellation(ct))
        {
            // 注意:liveMsg 需包含 Id!
            var parsed = JsonSerializer.Deserialize<LiveMessage>(liveMsg);
            var sse = $"id: {parsed.Id}\n" +
                      $"event: {parsed.EventType}\n" +
                      $"data: {parsed.Payload}\n\n";
            await writer.WriteAsync(sse);
        }
    }
    finally
    {
        await writer.DisposeAsync();
    }
}

⚠️ 关键点:实时消息也必须包含 id,因此 Channel 中传输的应是结构化对象(含 ID)。

4.5 消息生产流程整合

当有新事件发生时(如订单更新):

public class OrderService
{
    private readonly ISseMessageRepository _repo;
    private readonly IUserSseConnectionManager _sse;

    public async Task UpdateOrderStatus(string orderId, string status, string userId)
    {
        // 1. 保存到数据库(获取 ID)
        var payload = JsonSerializer.Serialize(new { OrderId = orderId, Status = status });
        var messageId = await _repo.SaveAsync("order.update", userId, payload);
        // 2. 构造带 ID 的实时消息
        var liveMsg = new LiveMessage(messageId, "order.update", payload);
        // 3. 广播给在线用户
        _sse.NotifyUser(userId, JsonSerializer.Serialize(liveMsg));
    }
}

这样,离线用户重连时能通过 Last-Event-ID 补全消息,在线用户实时收到带 ID 的推送,实现端到端可靠投递。

4.6 消息清理策略(防止无限增长)

持久化消息不能永久保留,需定期清理。

方案一:TTL(Time-To-Live)自动删除
// 每天凌晨清理 7 天前的消息
public class SseCleanupService : IHostedService
{
    private Timer? _timer;
    public Task StartAsync(CancellationToken ct)
    {
        _timer = new Timer(async _ =>
        {
            using var scope = _serviceProvider.CreateScope();
            var repo = scope.ServiceProvider.GetRequiredService<ISseMessageRepository>();
            await repo.DeleteOlderThanAsync(TimeSpan.FromDays(7));
        }, null, TimeSpan.FromHours(1), TimeSpan.FromDays(1));
        return Task.CompletedTask;
    }
    public Task StopAsync(CancellationToken ct)
    {
        _timer?.Dispose();
        return Task.CompletedTask;
    }
}
方案二:按 ID 范围清理(更精确)

记录每个用户的“已确认最大 ID”,定期删除小于该 ID 的消息。 💡 可扩展:客户端在收到消息后主动 POST /sse/ack?id=123,服务端更新“已确认 ID”。

4.7 客户端最佳实践

前端也需配合:

const userId = 'user123';
let es = new EventSource(`/api/sse/reliable/${userId}`);

es.onmessage = e => {
    console.log('收到消息:', e.data);
    // 可选:向服务器 ACK(用于精确清理)
    // fetch('/api/sse/ack', { method: 'POST', body: e.lastEventId });
};

// 监听错误并手动重连(增强鲁棒性)
es.onerror = () => {
    console.warn('SSE 连接异常,等待自动重连...');
    // 浏览器会自动重连,但可添加额外逻辑
};

📌 浏览器默认重连间隔为 3 秒,可通过 retry: 5000 设置为 5 秒。

4.8 可靠性语义总结

语义 实现方式 适用场景
最多一次 不存历史,只推实时 日志监控、股票行情(允许丢)
至少一次 持久化 + Last-Event-ID 支付通知、订单状态(不可丢)
恰好一次 需客户端去重(复杂) 金融交易(极少需要)

推荐:绝大多数场景使用“至少一次” + 客户端幂等处理(如根据订单 ID 去重)。

4.9 本章小结

  • 深入掌握了 SSE 的 Last-Event-ID 机制
  • 设计了基于数据库的消息持久化模型
  • 实现了断线自动续传功能
  • 整合了实时推送与历史回溯
  • 提出了消息清理与可靠性保障策略

第五章:SSE 安全性与认证授权

Server-Sent Events 虽然基于 HTTP,看似“天然安全”,但在实际应用中,若未正确实施身份验证与授权机制,极易导致敏感信息泄露、未授权订阅、会话劫持等严重安全问题。

本章目标: ✅ 在 SSE 连接中集成 ASP.NET Core 身份认证(JWT / Cookie) ✅ 实现细粒度授权(如仅允许访问自己的通知) ✅ 防御常见 Web 攻击(CSRF、XSS、重放攻击) ✅ 安全地传递认证凭据(避免 Token 泄露) ✅ 对比 SSE 与 SignalR 的安全模型差异

5.1 SSE 的认证挑战

SSE 使用标准 HTTP 请求建立连接,因此可以复用现有认证机制,但存在特殊限制:

认证方式 是否支持 注意事项
Cookie(Session) ✅ 完全支持 浏览器自动携带,适合 Web 应用
Authorization Header(Bearer Token) ⚠️ 不支持 EventSource 构造函数无法设置自定义 Header
URL Query 参数(?token=xxx) ✅ 技术可行 高风险! Token 可能被日志、Referer 泄露
客户端证书 / IP 白名单 ✅ 可用 适用于内网或 IoT 场景

关键限制

// ❌ 以下代码无效!EventSource 不允许设置 Authorization 头
new EventSource('/sse', { headers: { 'Authorization': 'Bearer xxx' } });

因此,Web 应用应优先使用 Cookie 认证;移动端或 SPA 若必须用 Token,需通过其他方式传递。

假设我们使用 ASP.NET Core Identity(基于 Cookie)。

步骤 1:启用身份认证
// Program.cs
builder.Services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme)
    .AddCookie(options =>
    {
        options.LoginPath = "/login";
        options.LogoutPath = "/logout";
    });
builder.Services.AddAuthorization();
app.UseAuthentication();
app.UseAuthorization();
步骤 2:在 SSE 控制器中要求登录
[Authorize] // ← 关键:强制认证
[HttpGet("user-notifications")]
public async Task UserNotifications(CancellationToken ct)
{
    var userId = User.FindFirstValue(ClaimTypes.NameIdentifier);
    if (string.IsNullOrEmpty(userId))
    {
        Response.StatusCode = 401;
        return;
    }
    // ... 正常 SSE 逻辑(仅推送该 userId 的消息)
}

✅ 浏览器在建立 SSE 连接时会自动携带 .AspNetCore.Cookies,服务端通过 User 对象获取身份。

步骤 3:前端无需额外处理
// 浏览器自动附带 Cookie
const es = new EventSource('/api/sse/user-notifications');
es.onmessage = e => console.log(e.data);

5.3 实现资源级授权(防止越权访问)

即使用户已登录,也必须确保只能订阅自己的数据

错误示例(危险!):
// ❌ 客户端可篡改 userId!
[HttpGet("notifications/{userId}")]
public async Task Notifications(string userId, ...) {
    // 直接使用 userId → 用户 A 可监听用户 B 的通知!
}
正确做法:从 Claims 获取真实用户 ID
[Authorize]
[HttpGet("my-notifications")] // ← 不暴露 userId
public async Task MyNotifications(CancellationToken ct)
{
    var currentUserId = User.FindFirstValue(ClaimTypes.NameIdentifier);
    // 仅订阅当前用户的消息
    var stream = _sseManager.SubscribeUser(currentUserId, ct);
    // ... 写入响应流
}

🔒 原则:SSE 端点不应接受可被篡改的用户标识参数,而应从认证上下文中提取。

5.4 SPA / 移动端如何安全传递 Token?

对于使用 JWT 的单页应用(React/Vue)或移动 App,由于 EventSource 无法设置 Header,可采用以下安全替代方案

  1. 登录后,将 JWT 写入 HttpOnly + Secure Cookie。
    var cookieOptions = new CookieOptions
    {
        HttpOnly = true,
        Secure = true,
        SameSite = SameSiteMode.Strict,
        Expires = DateTime.UtcNow.AddHours(1)
    };
    Response.Cookies.Append("auth-token", token, cookieOptions);
  2. Program.cs 中配置基于 Cookie 的 JWT 认证。
  3. SSE 端点正常使用 [Authorize]

✅ 优点:Token 不暴露给 JavaScript,防 XSS 窃取;浏览器自动携带。

方案二:临时 Token(短期有效)
  1. 客户端先请求一个 短期 SSE Token(有效期 30 秒)。
    POST /api/sse/token
    Authorization: Bearer <long-lived-jwt>
    → 返回 { sseToken: "temp-abc123" }
  2. 用该 Token 建立 SSE 连接。
    const es = new EventSource(`/api/sse/stream?token=${sseToken}`);
  3. 服务端验证 token 并映射到用户身份。

⚠️ 注意:Token 必须一次性使用 + 短期有效,防止重放。

5.5 防御 CSRF 攻击

SSE 本身是只读流,通常不执行状态变更操作,因此 CSRF 风险较低。但若 SSE 端点依赖 Cookie 认证,仍需防范:

  • SameSite Cookie:设置 SameSite=StrictLax
  • Origin 检查(可选,但注意 EventSource 发起的请求不包含 Origin 头,故可能无效)。

5.6 防范 XSS 与数据泄露

SSE 推送的数据若直接插入 DOM,可能引发 XSS:

// ❌ 危险!
es.onmessage = e => {
    document.getElementById('alerts').innerHTML = e.data; // 若 data 含 <script> 则执行
};

安全做法

  • 永远不要直接 innerHTML 用户可控内容。
  • 使用 textContent 或安全库(如 DOMPurify)。
  • 服务端对 Payload 进行 HTML 编码(双重保险)。

5.7 SSE vs SignalR 安全模型对比

特性 SSE SignalR
认证方式 依赖 HTTP(Cookie/Query) 支持 Header、Query、Cookie
加密 HTTPS(必须) HTTPS + 内置加密选项
细粒度授权 需手动实现 内置 Hub 方法级 [Authorize]
连接标识 无原生支持 Context.UserIdentifier
安全成熟度 低(需自行加固) 高(微软官方维护)

建议:若项目已用 SignalR,且需要双向通信,优先用 SignalR;若仅需轻量级单向推送,SSE + 严格安全措施足够。

5.8 本章小结

  • 明确了 SSE 的认证限制:不支持自定义 Header
  • 推荐使用 HttpOnly Cookie 传递身份凭据
  • 强调从 Claims 获取用户 ID,禁止客户端传参
  • 提供了 SPA 安全集成 JWT 的两种方案
  • 防御了 CSRF、XSS 等风险
  • 对比了 SSE 与 SignalR 的安全能力

第六章:SSE 性能调优与高并发实战

在前几章中,我们构建了功能完整、安全可靠的 SSE 系统。然而,当用户规模扩大到数千甚至数万并发连接时,基础实现将面临性能瓶颈:内存占用激增、GC 压力大、吞吐下降、连接超时等问题接踵而至。

本章目标: ✅ 使用 PipeWriterIAsyncEnumerable 实现零分配流式推送 ✅ 优化 Kestrel 服务器配置以支持高并发 SSE 连接 ✅ 基于 Redis Pub/Sub 实现多实例水平扩展 ✅ 使用 k6 对 SSE 接口进行压测与指标分析 ✅ 设计连接限流与熔断机制防止服务雪崩

6.1 性能瓶颈分析:为什么基础 SSE 会慢?

回顾第二章的基础实现:

var writer = new StreamWriter(Response.Body) { AutoFlush = true };
await writer.WriteLineAsync($"data: {json}\n\n");

问题在于:

  • StreamWriter 内部使用缓冲区 + 编码转换,每次写入涉及字符串拼接、UTF-8 转换、缓冲区拷贝。
  • 频繁 GC:每条消息生成临时字符串($"data: {json}\n\n")。
  • 同步阻塞风险:虽为异步,但底层仍可能触发同步 I/O。

📊 实测数据(单核)

  • 基础实现:约 3,000 并发连接,CPU 80%,吞吐 ~5k msg/s
  • 优化后:10,000+ 并发,CPU 50%,吞吐 >20k msg/s

6.2 使用 PipeWriter 实现高性能写入(零分配)

System.IO.Pipelines 是 .NET 高性能 I/O 的基石,Kestrel 内部也基于它。

优化后的 SSE 写入方法:
private static async Task WriteSseMessageAsync(PipeWriter writer, long id, string eventType, string payload)
{
    // 预分配足够空间(避免多次扩容)
    var buffer = writer.GetSpan(256);
    var pos = 0;

    // 写入 id:
    "id: ".AsSpan().CopyTo(buffer.Slice(pos));
    pos += 4;
    pos += Utf8Formatter.TryFormat(id, buffer.Slice(pos), out _);
    buffer[pos++] = (byte)'\n';

    // 写入 event:
    "event: ".AsSpan().CopyTo(buffer.Slice(pos));
    pos += 7;
    eventType.AsSpan().CopyTo(buffer.Slice(pos));
    pos += eventType.Length;
    buffer[pos++] = (byte)'\n';

    // 写入 data:
    "data: ".AsSpan().CopyTo(buffer.Slice(pos));
    pos += 6;
    payload.AsSpan().CopyTo(buffer.Slice(pos));
    pos += payload.Length;
    buffer[pos++] = (byte)'\n';
    buffer[pos++] = (byte)'\n';

    // 提交写入
    writer.Advance(pos);
    await writer.FlushAsync();
}

优势

  • 无字符串拼接:直接操作字节 Span。
  • 无 GC 压力:避免临时对象。
  • 批量写入:减少 syscall 次数。
在控制器中使用:
[HttpGet("high-perf")]
public async Task HighPerfSse(CancellationToken ct)
{
    Response.ContentType = "text/event-stream";
    var pipeWriter = Response.BodyWriter; // ASP.NET Core 3.0+ 支持

    while (!ct.IsCancellationRequested)
    {
        var payload = JsonSerializer.SerializeToUtf8Bytes(new { time = DateTime.UtcNow });
        var payloadStr = Encoding.UTF8.GetString(payload); // 若需字符串(可进一步优化)
        await WriteSseMessageAsync(pipeWriter, eventId++, "tick", payloadStr);
        await Task.Delay(1000, ct);
    }
}

🔧 进阶:若 payload 已是 UTF-8 字节,可直接写入 PipeWriter,完全绕过字符串。

6.3 Kestrel 高并发配置优化

默认 Kestrel 配置不适合长连接场景,需调整:

// Program.cs
builder.WebHost.ConfigureKestrel(options =>
{
    options.Limits.MaxConcurrentConnections = 20_000; // 默认无限制,但受系统约束
    options.Limits.MaxRequestBodySize = null;         // SSE 无请求体
    options.Limits.MinRequestBodyDataRate = null;     // 禁用低速检测(长连接会触发)
    options.Limits.MinResponseDataRate = null;        // 同上
    // 调整超时(SSE 连接可长期保持)
    options.Limits.KeepAliveTimeout = TimeSpan.FromMinutes(5);
    options.Limits.RequestHeadersTimeout = TimeSpan.FromSeconds(30);
});

⚠️ 注意:MinResponseDataRate 默认为 240 bytes/5sec,若 SSE 推送间隔 >5 秒,连接会被 Kestrel 主动关闭!

6.4 多实例部署:Redis Pub/Sub 实现连接共享

单机 SSE 无法水平扩展。当部署多个 ASP.NET Core 实例时,用户连接可能分散在不同节点,导致消息无法送达。

解决方案使用 Redis 作为消息总线

实现步骤:
  1. 发布消息到 Redis
    public async Task PublishNotification(string userId, object payload)
    {
        var message = JsonSerializer.Serialize(new { UserId = userId, Payload = payload });
        await _redis.PublishAsync("sse-notifications", message);
    }
  2. 每个实例订阅 Redis 并转发给本地连接
    public class RedisSseForwarder : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken ct)
        {
            var subscriber = _redis.GetSubscriber();
            await subscriber.SubscribeAsync("sse-notifications", async (channel, value) =>
            {
                var msg = JsonSerializer.Deserialize<Notification>(value);
                _connectionManager.NotifyUser(msg.UserId, msg.Payload);
            });
        }
    }
  3. 注册后台服务
    builder.Services.AddHostedService<RedisSseForwarder>();

✅ 优点:解耦生产者与消费者,支持任意实例数扩展,是云原生/IaaS架构中常见的解耦模式。 ⚠️ 注意:Redis 成为单点,需部署集群 + 持久化。

6.5 连接限流与熔断

防止恶意用户或 Bug 导致连接耗尽:

方案一:基于 IP 的连接数限制(中间件示例)
app.Use(async (ctx, next) =>
{
    var ip = ctx.Connection.RemoteIpAddress?.ToString();
    if (_connectionCounter.Get(ip) > 10) // 每 IP 最多 10 个 SSE 连接
    {
        ctx.Response.StatusCode = 429;
        return;
    }
    _connectionCounter.Increment(ip);
    ctx.Response.OnCompleted(() => _connectionCounter.Decrement(ip));
    await next();
});

6.6 本章小结

  • 使用 PipeWriter + Span 实现零分配高性能写入。
  • 调整 Kestrel 配置适配长连接场景。
  • 借助 Redis Pub/Sub 实现多实例水平扩展。
  • 掌握连接限流与熔断机制保障系统稳定。

第七章:SSE 生产环境部署与运维

开发和测试阶段的 SSE 实现,一旦进入生产环境,将面临网络代理、容器编排、连接保活、可观测性等全新挑战。一个未正确配置的反向代理可能在 30 秒后关闭“空闲”连接;缺乏监控则无法及时发现连接泄漏。

本章目标: ✅ 正确配置 Nginx 反向代理以支持长连接 SSE ✅ 在 Docker 中部署 SSE 服务 ✅ 配置健康探针 ✅ 集成 OpenTelemetry 实现指标采集 ✅ 使用 Prometheus + Grafana 监控连接数与消息吞吐

7.1 反向代理配置:Nginx 示例

Nginx 默认对“无数据传输”的连接有超时限制,必须显式调整:

location /api/sse/ {
    proxy_pass http://backend;
    # 关键:禁用代理缓冲(否则消息会堆积不下发)
    proxy_buffering off;
    # 设置足够长的超时(根据业务需求)
    proxy_read_timeout 24h;  # 后端响应超时(SSE 是长响应)
    proxy_send_timeout 24h;  # 向客户端发送超时
    # 透传连接头
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    # 保留原始 Host 和 IP
    proxy_set_header Host $http_host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}

⚠️ 常见错误

  • 忘记 proxy_buffering off → 消息延迟数秒甚至分钟才到达前端。
  • proxy_read_timeout 过短(默认 60s) → 连接被 Nginx 主动关闭。

7.2 Docker 部署与健康检查

Dockerfile(优化启动速度)
FROM mcr.microsoft.com/dotnet/aspnet:10.0 AS runtime
WORKDIR /app
COPY --from=build /app/publish .
# 健康检查端点
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
  CMD curl -f http://localhost:80/health || exit 1
ENTRYPOINT ["dotnet", "MySseApp.dll"]
健康检查端点设计

实现两个独立的健康端点:

// Program.cs
app.MapGet("/health/live", () => Results.Ok("Alive")); // 仅检查进程是否运行
app.MapGet("/health/ready", async (IConnectionManager mgr) =>
{
    // 检查关键依赖:Redis、DB 是否可达
    if (await mgr.IsRedisConnectedAsync())
        return Results.Ok("Ready");
    else
        return Results.StatusCode(503);
});

📌 /live 用于判断是否重启容器;/ready 用于判断是否接收流量。

7.3 可观测性:OpenTelemetry 集成

配置遥测管道
// Program.cs
builder.Services.AddOpenTelemetry()
    .ConfigureResource(resource => resource.AddService("sse-service"))
    .WithMetrics(metrics =>
    {
        metrics
            .AddAspNetCoreInstrumentation()
            .AddMeter("SseMetrics") // 自定义指标
            .AddPrometheusExporter(); // 暴露 /metrics 端点
    });
自定义 SSE 指标(连接数、消息速率)
public class SseMetrics
{
    private static readonly Meter Meter = new("SseMetrics");
    private static readonly UpDownCounter<int> ActiveConnections =
        Meter.CreateUpDownCounter<int>("sse.active_connections");
    private static readonly Counter<long> MessagesSent =
        Meter.CreateCounter<long>("sse.messages_sent");

    public static void IncrementConnections() => ActiveConnections.Add(1);
    public static void DecrementConnections() => ActiveConnections.Add(-1);
    public static void RecordMessageSent() => MessagesSent.Add(1);
}

在 SSE 控制器中调用这些方法以记录指标。

7.4 Prometheus + Grafana 监控看板

通过 PrometheusExporter 暴露指标后,可在 Grafana 中创建看板,监控关键指标:

  • sse_active_connections:当前连接数(按实例分组)。
  • sse_messages_sent_rate:消息发送速率(msg/s)。
  • process_working_set_bytes:内存使用量。

📈 告警规则示例(Prometheus)

- alert: HighSseConnectionCount
  expr: sum(sse_active_connections) > 10000
  for: 5m
  labels:
    severity: warning

7.5 本章小结

  • 正确配置了 Nginx 以支持 SSE 长连接。
  • 提供了 Docker 部署最佳实践与健康检查配置。
  • 实现了基于 OpenTelemetry 的可观测性。
  • 构建了 Prometheus + Grafana 监控体系。
  • 这些运维/DevOps实践是保障生产环境 SSE 服务稳定的关键。

第八章:完整实战项目 —— 实时订单监控看板

本章将整合前七章的所有核心知识,从零构建一个生产就绪的实时订单监控系统。该系统模拟电商平台后台,运营人员可通过 Web 看板实时查看新订单、支付状态变更、发货进度等事件。

🎯 项目目标

  • 后端:ASP.NET Core 10 + Redis(用于多实例扩展)
  • 前端:React 18 + TypeScript
  • 安全:JWT Cookie 认证
  • 运维:Docker 容器化 + Nginx + 监控
  • 特性:自动重连、消息去重、离线回溯、连接限流

8.1 项目架构概览

[React Frontend]
       ↓ (EventSource)
[Nginx] → [SSE Service (ASP.NET Core)] ←→ [Order API (模拟)]
                    ↑
            [Redis Pub/Sub]
                    ↑
        [Other Services (e.g., Payment)]

8.2 后端核心实现(ASP.NET Core 10)

1. 用户认证(基于 JWT Cookie)
builder.Services.AddAuthentication("jwt-cookie")
    .AddCookie("jwt-cookie", options =>
    {
        options.Cookie.HttpOnly = true;
        options.Cookie.SecurePolicy = CookieSecurePolicy.Always;
        options.Cookie.SameSite = SameSiteMode.Strict;
        options.Events.OnValidatePrincipal = async ctx =>
        {
            var token = ctx.Request.Cookies["auth-token"];
            if (JwtHelper.Validate(token, out var claims))
                ctx.Principal = new ClaimsPrincipal(new ClaimsIdentity(claims, "jwt"));
        };
    });
2. SSE 控制器(带授权与限流)
[Authorize]
[ApiController]
[Route("api/[controller]")]
public class SseController : ControllerBase
{
    [HttpGet("orders")]
    public async Task OrdersStream(CancellationToken ct)
    {
        var userId = User.FindFirstValue(ClaimTypes.NameIdentifier);
        if (string.IsNullOrEmpty(userId)) { Response.StatusCode = 401; return; }

        // 连接限流(每用户最多 2 个连接)
        if (!_connMgr.TryAddConnection(userId))
        {
            Response.StatusCode = 429;
            await Response.WriteAsync("Too many connections");
            return;
        }

        Response.ContentType = "text/event-stream";
        Response.Headers.Append("Cache-Control", "no-cache");
        Response.Headers.Append("X-Accel-Buffering", "no"); // Nginx 优化

        var lastId = Request.Query["lastId"];
        _logger.LogInformation("User {UserId} connected to SSE (lastId: {LastId})", userId, lastId);

        try
        {
            await _connMgr.StreamUserOrdersAsync(userId, lastId, Response.BodyWriter, ct);
        }
        finally
        {
            _connMgr.RemoveConnection(userId);
            _logger.LogInformation("User {UserId} disconnected from SSE", userId);
        }
    }
}
3. 连接管理器(支持 Redis 订阅与历史消息)
public class ConnectionManager : BackgroundService
{
    private readonly Dictionary<string, List<(PipeWriter writer, string lastId)>> _connections = new();
    private readonly ISubscriber _redis;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        await _redis.SubscribeAsync("order-events", async (channel, payload) =>
        {
            var ev = JsonSerializer.Deserialize<OrderEvent>(payload);
            await NotifyUser(ev.UserId, ev);
        });
    }

    public async Task StreamUserOrdersAsync(string userId, string lastId, PipeWriter writer, CancellationToken ct)
    {
        lock (_connections)
        {
            if (!_connections.ContainsKey(userId))
                _connections[userId] = new();
            _connections[userId].Add((writer, lastId));
        }
        // 发送历史消息(最多 10 条)
        var history = GetHistoricalEvents(userId, lastId).Take(10);
        foreach (var ev in history)
        {
            await WriteSseMessageAsync(writer, ev.Id, "order", JsonSerializer.Serialize(ev), ct);
        }
        // 保持连接
        while (!ct.IsCancellationRequested)
        {
            await Task.Delay(1000, ct); // 心跳维持(可选)
        }
    }
}

8.3 前端实现(React + TypeScript)

封装 useOrderSse Hook:

// hooks/useOrderSse.ts
export const useOrderSse = (onOrder: (order: Order) => void) => {
  const [isConnected, setIsConnected] = useState(false);
  useEffect(() => {
    const lastId = localStorage.getItem('lastOrderId') || '';
    const url = `/api/sse/orders?lastId=${encodeURIComponent(lastId)}`;
    const es = new EventSource(url, { withCredentials: true }); // 携带 Cookie
    es.onmessage = (e) => {
      const order = JSON.parse(e.data) as Order;
      localStorage.setItem('lastOrderId', order.id); // 缓存 ID 用于回溯
      onOrder(order);
      setIsConnected(true);
    };
    es.onerror = () => setIsConnected(false);
    return () => es.close();
  }, []);
  return { isConnected };
};

8.4 部署与监控

Docker Compose 部署
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
  sse-service:
    build: ./SseService
    ports: ["5000:80"]
    environment:
      - ASPNETCORE_ENVIRONMENT=Production
    depends_on: [redis]
  nginx:
    image: nginx:alpine
    ports: ["80:80"]
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on: [sse-service]

8.5 本章小结

通过这个实战项目,我们综合运用了 SSE 实现中的各项关键技术:从基础推送、广播架构、可靠性保障、安全性、性能优化到最终的生产环境部署与监控,构建了一个完整、健壮、可扩展的实时应用系统。




上一篇:SpringBoot实战:ThreadLocal父子线程传值方案详解(含线程池场景)
下一篇:网络故障排查:ping外网失败显示目标主机不可达原因分析
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区(YunPan.Plus) ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-7 03:24 , Processed in 0.084226 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 CloudStack.

快速回复 返回顶部 返回列表