第一章:Server-Sent Events(SSE)基础概念与原理
1.1 什么是 Server-Sent Events?
Server-Sent Events(SSE)是一种基于 HTTP 的单向实时通信协议,允许服务器向客户端主动推送数据。与 WebSocket 不同,SSE 是单工通信(仅服务器 → 客户端),且基于标准 HTTP/1.1 或 HTTP/2,无需额外协议支持。
SSE 的核心特点:
- 使用
text/event-stream MIME 类型
- 自动重连机制(通过
retry 字段)
- 支持事件 ID(用于断线重传)
- 原生浏览器支持(
EventSource API)
- 轻量、简单、易于调试
1.2 SSE vs WebSocket vs 长轮询
| 特性 |
SSE |
WebSocket |
长轮询 |
| 通信方向 |
单向(服务端→客户端) |
双向 |
模拟双向(实际为请求-响应) |
| 协议 |
HTTP |
独立协议(ws/wss) |
HTTP |
| 浏览器支持 |
广泛(除 IE) |
广泛 |
全部 |
| 实现复杂度 |
低 |
中高 |
低 |
| 连接开销 |
低(复用 HTTP) |
中(需握手) |
高(频繁请求) |
| 适用场景 |
实时通知、日志流、股票行情 |
聊天、游戏、协同编辑 |
兼容旧浏览器 |
结论:若只需服务器向客户端推送数据,SSE 是比 WebSocket 更轻量、更易维护的选择。
1.3 SSE 协议格式详解
SSE 数据以纯文本形式通过 HTTP 响应流发送,格式如下:
event: message
data: {"time":"2025-12-03T10:00:00Z","msg":"Hello SSE"}
id: 123
retry: 5000
data::必须字段,表示消息内容(可多行,以空行结束)
event::可选,指定事件类型(客户端可监听特定事件)
id::消息 ID,用于断线后重传(浏览器自动在 Last-Event-ID 请求头中携带)
retry::重连间隔(毫秒)
⚠️ 注意:每行以\n结尾,整个消息块以\n\n结束。
1.4 浏览器端使用 EventSource
const eventSource = new EventSource('/sse/notifications');
eventSource.onmessage = (e) => {
console.log('默认事件:', e.data);
};
eventSource.addEventListener('alert', (e) => {
console.log('自定义 alert 事件:', e.data);
});
eventSource.onerror = (err) => {
console.error('SSE 连接错误:', err);
};
浏览器会自动处理:
- 自动重连(失败后按
retry 值重试)
- 断线后携带
Last-Event-ID 头
- 自动解析
data、event、id 字段
1.5 ASP.NET Core 对 SSE 的原生支持
ASP.NET Core 没有内置 SSE 中间件,但可通过以下方式实现:
- 手动设置
Content-Type: text/event-stream
- 使用
HttpResponse.BodyWriter 或 StreamWriter 写入流
- 利用
CancellationToken 监听连接中断
- 结合
IHostedService 或 Channel<T> 实现消息广播
关键点:SSE 本质是长连接 HTTP 响应流,需保持连接打开并持续写入。
1.6 本章小结
- SSE 是轻量级服务器推送技术,适用于单向实时场景
- 协议简单,基于 HTTP,天然穿越防火墙
- 浏览器原生支持
EventSource,自动处理重连与 ID
- ASP.NET Core 需手动实现 SSE 响应逻辑
第二章:在 ASP.NET Core 10 中实现基础 SSE 接口
本章目标:
✅ 创建一个可工作的 SSE 端点
✅ 理解 ASP.NET Core 中如何管理长连接流
✅ 实现基本的消息推送(如时间戳、通知)
✅ 正确处理客户端断开与资源释放
2.1 创建 ASP.NET Core 10 项目
首先,确保你已安装 .NET 10 SDK。使用 CLI 创建 Web API 项目:
dotnet new webapi -n SseDemo
cd SseDemo
删除 WeatherForecastController.cs(示例文件),我们将从零构建 SSE 控制器。
2.2 编写基础 SSE 控制器
创建 Controllers/SseController.cs:
using Microsoft.AspNetCore.Mvc;
namespace SseDemo.Controllers;
[ApiController]
[Route("api/[controller]")]
public class SseController : ControllerBase
{
[HttpGet("time")]
public async Task TimeStream(CancellationToken cancellationToken)
{
// 设置响应头
Response.Headers.ContentType = "text/event-stream";
Response.Headers.CacheControl = "no-cache";
Response.Headers.Connection = "keep-alive";
var writer = new StreamWriter(Response.Body, leaveOpen: true)
{
AutoFlush = true // 关键!确保数据立即发送
};
try
{
while (!cancellationToken.IsCancellationRequested)
{
var message = $"data: {{\"time\": \"{DateTime.UtcNow:O}\"}}\n\n";
await writer.WriteAsync(message);
// 每秒推送一次
await Task.Delay(1000, cancellationToken);
}
}
catch (OperationCanceledException)
{
// 客户端断开连接时会触发此异常(正常)
}
finally
{
await writer.DisposeAsync();
}
}
}
关键点解析:
- 返回类型为
Task:不使用 IActionResult,因为我们要直接操作 Response.Body。
- 设置响应头:
Content-Type: text/event-stream:告知浏览器这是 SSE 流
Cache-Control: no-cache:防止代理缓存
Connection: keep-alive:保持 TCP 连接
StreamWriter + AutoFlush = true:若不启用自动刷新,数据可能滞留在缓冲区,导致客户端收不到实时消息。
CancellationToken 监听中断:当用户关闭页面或网络断开,ASP.NET Core 会自动触发 cancellationToken,退出循环。
- 消息格式:使用标准 SSE 格式:
data: {...}\n\n
2.3 测试 SSE 接口
方法一:浏览器控制台测试
在 HTML 页面中添加:
<script>
const es = new EventSource('/api/sse/time');
es.onmessage = e => console.log(JSON.parse(e.data));
</script>
打开开发者工具 → Console,将看到每秒输出 UTC 时间。
方法二:使用 curl(仅查看原始流)
curl -H "Accept: text/event-stream" http://localhost:5000/api/sse/time
注意:curl 不会自动重连,但可验证流是否正常输出。
2.4 增强:支持事件类型与 ID
修改推送逻辑,加入 event 和 id 字段:
var eventId = 0;
while (!cancellationToken.IsCancellationRequested)
{
eventId++;
var payload = new
{
time = DateTime.UtcNow,
message = "Current server time"
};
var json = System.Text.Json.JsonSerializer.Serialize(payload);
var sseMessage =
$"event: tick\n" +
$"id: {eventId}\n" +
$"data: {json}\n\n";
await writer.WriteAsync(sseMessage);
await Task.Delay(2000, cancellationToken); // 每2秒
}
前端可监听特定事件:
const es = new EventSource('/api/sse/time');
es.addEventListener('tick', e => {
console.log('Tick event:', JSON.parse(e.data));
});
💡 提示:Last-Event-ID 机制将在后续章节结合“消息恢复”功能详解。
2.5 处理客户端断开连接的正确方式
ASP.NET Core 在客户端断开时会:
- 触发
HttpContext.RequestAborted 的 CancellationToken
- 抛出
OperationCanceledException(若正在等待异步操作)
因此,必须用 try-catch 包裹循环,并在 finally 中释放资源。
⚠️ 常见错误1:忘记 AutoFlush = true → 数据卡在缓冲区
⚠️ 常见错误2:未处理取消令牌 → 连接断开后后台仍在运行(内存泄漏)
2.6 使用 Minimal API 实现 SSE(可选)
如果你偏好 Minimal API,可这样写:
// Program.cs
app.MapGet("/sse/mini", async (HttpContext ctx) =>
{
var response = ctx.Response;
response.ContentType = "text/event-stream";
response.Headers.CacheControl = "no-cache";
var writer = new StreamWriter(response.Body, leaveOpen: true) { AutoFlush = true };
var ct = ctx.RequestAborted;
try
{
while (!ct.IsCancellationRequested)
{
await writer.WriteLineAsync($"data: {{\"count\":{DateTime.Now.Second}}}\n\n");
await Task.Delay(1000, ct);
}
}
finally
{
await writer.DisposeAsync();
}
});
效果完全一致,代码更紧凑。
2.7 性能与资源考量
每个 SSE 连接会:
- 占用一个线程(实际是异步,不阻塞线程池)
- 保持一个 HTTP 连接(受 Kestrel 最大连接数限制)
- 持有
StreamWriter 和内存缓冲区
📊 默认 Kestrel 最大连接数为无限制(但受系统资源约束)。生产环境需监控连接数。
建议:
- 避免在控制器中创建大对象
- 使用对象池复用序列化器(如
JsonSerializer)
- 考虑使用
Pipelines 或 IAsyncEnumerable 提升吞吐(见后续章节)
2.8 本章小结
- 成功在 ASP.NET Core 10 中实现基础 SSE 推送
- 掌握了
StreamWriter + AutoFlush 的关键组合
- 理解了
CancellationToken 在连接管理中的作用
- 学会了发送带
event 和 id 的结构化消息
- 对比了 Controller 与 Minimal API 两种实现方式
第三章:构建可扩展的 SSE 消息广播系统
在第二章中,我们实现了单个客户端与服务器之间的简单 SSE 连接。但在真实场景中,一个事件(如“新订单”、“系统告警”)往往需要同时推送给成百上千个在线用户。这就要求我们构建一个可扩展、线程安全、资源高效的 SSE 广播系统。
本章目标:
✅ 使用 Channel<T> 实现高性能消息队列
✅ 设计连接管理器(Connection Manager)
✅ 实现多客户端订阅/退订机制
✅ 构建“实时通知中心”实战项目
✅ 避免常见并发陷阱(如竞态条件、内存泄漏)
3.1 为什么需要消息广播架构?
直接在控制器中写死推送逻辑的问题:
- 无法跨请求共享连接
- 无法从其他服务(如后台任务、API 调用)触发推送
- 每个连接独立运行,无法统一管理
- 扩展性差,难以支持“房间”或“主题”订阅
理想架构应具备:
- 解耦:消息生产者 ≠ 消费者
- 广播:一条消息 → 多个连接
- 弹性:连接动态加入/离开
- 可观测:可监控连接数、消息速率
3.2 引入 Channel:.NET 的高性能异步管道
System.Threading.Channels 提供了线程安全的生产者-消费者通道,非常适合 SSE 场景。
// 创建一个无界通道(也可用有界通道限流)
var channel = Channel.CreateUnbounded<string>();
- 生产者:调用
channel.Writer.WriteAsync(message)
- 消费者:通过
await channel.Reader.ReadAsync() 获取消息
优势:
- 零分配(zero-allocation)设计
- 支持背压(有界通道)
- 原生支持
IAsyncEnumerable
- 比
ConcurrentQueue + ManualResetEvent 更简洁高效
3.3 设计 SSE 连接管理器(SseConnectionManager)
创建 Services/SseConnectionManager.cs:
using System.Threading.Channels;
namespace SseDemo.Services;
public class SseConnectionManager
{
private readonly ConcurrentDictionary<string, ChannelWriter<string>> _writers = new();
public IAsyncEnumerable<string> Subscribe(string connectionId, CancellationToken ct)
{
var channel = Channel.CreateUnbounded<string>();
_writers.TryAdd(connectionId, channel.Writer);
ct.Register(() => _writers.TryRemove(connectionId, out _));
return channel.Reader.ReadAllAsync(ct);
}
public void Broadcast(string message)
{
foreach (var writer in _writers.Values)
{
// Fire-and-forget,不等待
_ = writer.TryWrite(message);
}
}
// 可选:按主题广播(见 3.6 节)
}
关键改进:
- 使用
ConcurrentDictionary 避免显式加锁
- 每个连接拥有独立
ChannelWriter
- 利用
CancellationToken.Register 自动清理
TryWrite 非阻塞,适合高吞吐
3.4 注册服务到 DI 容器
在 Program.cs 中:
builder.Services.AddSingleton<SseConnectionManager>();
💡 必须是 Singleton,确保所有请求共享同一个管理器。
3.5 修改控制器以支持广播
[ApiController]
[Route("api/[controller]")]
public class SseController : ControllerBase
{
private readonly SseConnectionManager _sseManager;
public SseController(SseConnectionManager sseManager)
{
_sseManager = sseManager;
}
[HttpGet("subscribe")]
public async Task Subscribe(CancellationToken cancellationToken)
{
Response.Headers.ContentType = "text/event-stream";
Response.Headers.CacheControl = "no-cache";
var connectionId = Guid.NewGuid().ToString();
var stream = _sseManager.Subscribe(connectionId, cancellationToken);
var writer = new StreamWriter(Response.Body, leaveOpen: true) { AutoFlush = true };
try
{
await foreach (var message in stream.WithCancellation(cancellationToken))
{
await writer.WriteLineAsync($"data: {message}\n\n");
}
}
finally
{
await writer.DisposeAsync();
// 自动由 CancellationToken 注册的回调清理
}
}
[HttpPost("notify")]
public IActionResult Notify([FromBody] object payload)
{
var json = JsonSerializer.Serialize(payload);
_sseManager.Broadcast(json);
return Ok("Notified");
}
}
现在:
- 客户端 GET
/api/sse/subscribe 建立 SSE 连接
- 任何服务 POST
/api/sse/notify 即可向所有连接广播消息
3.6 实战:构建“订单状态通知中心”
假设我们有一个电商系统,需要实时通知用户订单状态变更。
步骤 1:定义消息模型
public record OrderNotification(
string UserId,
string OrderId,
string Status,
DateTime Timestamp);
步骤 2:增强 ConnectionManager 支持“按用户订阅”
public class UserSseConnectionManager
{
private readonly ConcurrentDictionary<string, HashSet<ChannelWriter<string>>> _userConnections = new();
public IAsyncEnumerable<string> Subscribe(string userId, CancellationToken ct)
{
var channel = Channel.CreateUnbounded<string>();
var writer = channel.Writer;
_userConnections.AddOrUpdate(userId,
_ => new HashSet<ChannelWriter<string>> { writer },
(_, set) => { set.Add(writer); return set; });
ct.Register(() =>
{
var set = _userConnections.GetOrAdd(userId, _ => new());
lock (set) set.Remove(writer);
});
return channel.Reader.ReadAllAsync(ct);
}
public void NotifyUser(string userId, string message)
{
if (_userConnections.TryGetValue(userId, out var writers))
{
var deadWriters = new List<ChannelWriter<string>>();
foreach (var writer in writers)
{
if (!writer.TryWrite(message))
deadWriters.Add(writer);
}
if (deadWriters.Count > 0)
{
lock (writers)
{
foreach (var w in deadWriters)
writers.Remove(w);
}
}
}
}
}
步骤 3:在订单服务中调用
// OrderService.cs
public class OrderService
{
private readonly UserSseConnectionManager _sse;
public OrderService(UserSseConnectionManager sse) => _sse = sse;
public async Task UpdateOrderStatus(string orderId, string newStatus, string userId)
{
// ... 更新数据库
var notification = new OrderNotification(userId, orderId, newStatus, DateTime.UtcNow);
var json = JsonSerializer.Serialize(notification);
_sse.NotifyUser(userId, json);
}
}
步骤 4:前端监听
const userId = 'user123';
const es = new EventSource(`/api/sse/user/${userId}`);
es.onmessage = e => {
const notif = JSON.parse(e.data);
showNotification(`订单 ${notif.orderId} 状态更新为: ${notif.status}`);
};
3.7 性能优化建议
- 使用有界通道防止 OOM
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.DropWrite
});
- 批量写入:若消息频率极高,可聚合多条消息再发送(但牺牲实时性)。
- 压缩响应(HTTP Compression):在
Program.cs 启用 builder.Services.AddResponseCompression(); 和 app.UseResponseCompression();。注意:某些代理可能不支持流式压缩,需测试。
- 监控连接数:暴露指标供 Prometheus 抓取,例如
public int GetConnectionCount() => _writers.Count;。
3.8 本章小结
- 使用
Channel<T> 构建了高性能消息管道
- 设计了线程安全的
SseConnectionManager
- 实现了全局广播与用户级定向推送
- 完成了“订单通知中心”实战案例
- 掌握了资源清理、并发控制、性能优化技巧
第四章:SSE 中的断线重连与消息可靠性保障
在前几章中,我们实现了基本的 SSE 推送和广播系统。然而,在真实网络环境中,客户端可能因网络波动、页面刷新、设备休眠等原因频繁断开连接。若不加以处理,用户将丢失断线期间的重要消息(如订单支付成功、系统告警等)。
本章目标:
✅ 深入理解 SSE 的 Last-Event-ID 机制
✅ 实现断线自动续传(消息回溯)
✅ 设计可靠的消息存储与投递策略
✅ 构建“至少一次”语义的 SSE 服务
✅ 结合数据库实现消息持久化与清理
4.1 SSE 的内置重连机制:Last-Event-ID
SSE 协议原生支持断线重连与消息恢复,核心是两个字段:
| 字段 |
作用 |
id:(服务器发送) |
为每条消息分配唯一 ID |
Last-Event-ID(客户端自动携带) |
重连时在请求头中带上最后收到的 ID |
浏览器行为:当连接断开,浏览器会自动发起新请求,并在 HTTP 头中包含:
Last-Event-ID: 123
我们的服务端必须:
- 在每条 SSE 消息中包含
id: <唯一ID>
- 从请求头读取
Last-Event-ID
- 从该 ID 之后开始重发未送达的消息
4.2 设计消息 ID 策略
消息 ID 必须满足:
- 单调递增(便于比较)
- 全局唯一(跨实例、跨时间)
- 可排序(支持范围查询)
推荐方案:
- 时间戳 + 序列号(如
20251203120000_001)
- ULID / Snowflake ID(分布式友好)
- 数据库自增主键(简单但依赖 DB)
✅ 本章采用 64 位整数 ID(如数据库主键),兼顾性能与排序。
4.3 消息存储模型设计
我们需要一个持久化消息日志表,用于存储所有已发送事件。这通常涉及到数据库/中间件如 SQL Server 或 PostgreSQL。
CREATE TABLE SseMessages (
Id BIGINT IDENTITY(1,1) PRIMARY KEY,
EventType NVARCHAR(100) NOT NULL, -- 如 "order.update"
TargetUserId NVARCHAR(128), -- 可为空(表示全局广播)
Payload NVARCHAR(MAX) NOT NULL, -- JSON 内容
CreatedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME()
);
-- 索引优化查询
CREATE INDEX IX_SseMessages_TargetUserId_Id ON SseMessages (TargetUserId, Id);
CREATE INDEX IX_SseMessages_Id ON SseMessages (Id);
💡 若消息量极大(>1亿/天),可考虑分表、TTL 自动清理或使用专用时序数据库。
4.4 实现可靠 SSE 控制器(支持 Last-Event-ID)
步骤 1:创建消息仓储接口
public record SseMessage(long Id, string EventType, string? TargetUserId, string Payload);
public interface ISseMessageRepository
{
Task<long> SaveAsync(string eventType, string? targetUserId, string payload);
IAsyncEnumerable<SseMessage> GetSinceAsync(string? targetUserId, long lastId);
}
步骤 2:修改 SSE 控制器
[HttpGet("reliable/{userId?}")]
public async Task ReliableStream(string? userId, CancellationToken ct)
{
Response.Headers.ContentType = "text/event-stream";
Response.Headers.CacheControl = "no-cache";
// 1. 读取 Last-Event-ID
var lastIdStr = Request.Headers["Last-Event-ID"].FirstOrDefault();
var lastId = long.TryParse(lastIdStr, out var id) ? id : 0L;
// 2. 获取历史未送达消息
var backlog = _messageRepo.GetSinceAsync(userId, lastId);
var writer = new StreamWriter(Response.Body, leaveOpen: true) { AutoFlush = true };
try
{
// 3. 先发送积压消息
await foreach (var msg in backlog.WithCancellation(ct))
{
var sse = $"id: {msg.Id}\n" +
$"event: {msg.EventType}\n" +
$"data: {msg.Payload}\n\n";
await writer.WriteAsync(sse);
}
// 4. 订阅实时新消息(通过 Channel 广播)
var liveStream = _connectionManager.Subscribe(userId, ct);
await foreach (var liveMsg in liveStream.WithCancellation(ct))
{
// 注意:liveMsg 需包含 Id!
var parsed = JsonSerializer.Deserialize<LiveMessage>(liveMsg);
var sse = $"id: {parsed.Id}\n" +
$"event: {parsed.EventType}\n" +
$"data: {parsed.Payload}\n\n";
await writer.WriteAsync(sse);
}
}
finally
{
await writer.DisposeAsync();
}
}
⚠️ 关键点:实时消息也必须包含 id,因此 Channel 中传输的应是结构化对象(含 ID)。
4.5 消息生产流程整合
当有新事件发生时(如订单更新):
public class OrderService
{
private readonly ISseMessageRepository _repo;
private readonly IUserSseConnectionManager _sse;
public async Task UpdateOrderStatus(string orderId, string status, string userId)
{
// 1. 保存到数据库(获取 ID)
var payload = JsonSerializer.Serialize(new { OrderId = orderId, Status = status });
var messageId = await _repo.SaveAsync("order.update", userId, payload);
// 2. 构造带 ID 的实时消息
var liveMsg = new LiveMessage(messageId, "order.update", payload);
// 3. 广播给在线用户
_sse.NotifyUser(userId, JsonSerializer.Serialize(liveMsg));
}
}
这样,离线用户重连时能通过 Last-Event-ID 补全消息,在线用户实时收到带 ID 的推送,实现端到端可靠投递。
4.6 消息清理策略(防止无限增长)
持久化消息不能永久保留,需定期清理。
方案一:TTL(Time-To-Live)自动删除
// 每天凌晨清理 7 天前的消息
public class SseCleanupService : IHostedService
{
private Timer? _timer;
public Task StartAsync(CancellationToken ct)
{
_timer = new Timer(async _ =>
{
using var scope = _serviceProvider.CreateScope();
var repo = scope.ServiceProvider.GetRequiredService<ISseMessageRepository>();
await repo.DeleteOlderThanAsync(TimeSpan.FromDays(7));
}, null, TimeSpan.FromHours(1), TimeSpan.FromDays(1));
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken ct)
{
_timer?.Dispose();
return Task.CompletedTask;
}
}
方案二:按 ID 范围清理(更精确)
记录每个用户的“已确认最大 ID”,定期删除小于该 ID 的消息。
💡 可扩展:客户端在收到消息后主动 POST /sse/ack?id=123,服务端更新“已确认 ID”。
4.7 客户端最佳实践
前端也需配合:
const userId = 'user123';
let es = new EventSource(`/api/sse/reliable/${userId}`);
es.onmessage = e => {
console.log('收到消息:', e.data);
// 可选:向服务器 ACK(用于精确清理)
// fetch('/api/sse/ack', { method: 'POST', body: e.lastEventId });
};
// 监听错误并手动重连(增强鲁棒性)
es.onerror = () => {
console.warn('SSE 连接异常,等待自动重连...');
// 浏览器会自动重连,但可添加额外逻辑
};
📌 浏览器默认重连间隔为 3 秒,可通过 retry: 5000 设置为 5 秒。
4.8 可靠性语义总结
| 语义 |
实现方式 |
适用场景 |
| 最多一次 |
不存历史,只推实时 |
日志监控、股票行情(允许丢) |
| 至少一次 |
持久化 + Last-Event-ID |
支付通知、订单状态(不可丢) |
| 恰好一次 |
需客户端去重(复杂) |
金融交易(极少需要) |
✅ 推荐:绝大多数场景使用“至少一次” + 客户端幂等处理(如根据订单 ID 去重)。
4.9 本章小结
- 深入掌握了 SSE 的
Last-Event-ID 机制
- 设计了基于数据库的消息持久化模型
- 实现了断线自动续传功能
- 整合了实时推送与历史回溯
- 提出了消息清理与可靠性保障策略
第五章:SSE 安全性与认证授权
Server-Sent Events 虽然基于 HTTP,看似“天然安全”,但在实际应用中,若未正确实施身份验证与授权机制,极易导致敏感信息泄露、未授权订阅、会话劫持等严重安全问题。
本章目标:
✅ 在 SSE 连接中集成 ASP.NET Core 身份认证(JWT / Cookie)
✅ 实现细粒度授权(如仅允许访问自己的通知)
✅ 防御常见 Web 攻击(CSRF、XSS、重放攻击)
✅ 安全地传递认证凭据(避免 Token 泄露)
✅ 对比 SSE 与 SignalR 的安全模型差异
5.1 SSE 的认证挑战
SSE 使用标准 HTTP 请求建立连接,因此可以复用现有认证机制,但存在特殊限制:
| 认证方式 |
是否支持 |
注意事项 |
| Cookie(Session) |
✅ 完全支持 |
浏览器自动携带,适合 Web 应用 |
| Authorization Header(Bearer Token) |
⚠️ 不支持 |
EventSource 构造函数无法设置自定义 Header |
| URL Query 参数(?token=xxx) |
✅ 技术可行 |
高风险! Token 可能被日志、Referer 泄露 |
| 客户端证书 / IP 白名单 |
✅ 可用 |
适用于内网或 IoT 场景 |
关键限制:
// ❌ 以下代码无效!EventSource 不允许设置 Authorization 头
new EventSource('/sse', { headers: { 'Authorization': 'Bearer xxx' } });
因此,Web 应用应优先使用 Cookie 认证;移动端或 SPA 若必须用 Token,需通过其他方式传递。
5.2 使用 Cookie + Identity 实现用户认证
假设我们使用 ASP.NET Core Identity(基于 Cookie)。
步骤 1:启用身份认证
// Program.cs
builder.Services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme)
.AddCookie(options =>
{
options.LoginPath = "/login";
options.LogoutPath = "/logout";
});
builder.Services.AddAuthorization();
app.UseAuthentication();
app.UseAuthorization();
步骤 2:在 SSE 控制器中要求登录
[Authorize] // ← 关键:强制认证
[HttpGet("user-notifications")]
public async Task UserNotifications(CancellationToken ct)
{
var userId = User.FindFirstValue(ClaimTypes.NameIdentifier);
if (string.IsNullOrEmpty(userId))
{
Response.StatusCode = 401;
return;
}
// ... 正常 SSE 逻辑(仅推送该 userId 的消息)
}
✅ 浏览器在建立 SSE 连接时会自动携带 .AspNetCore.Cookies,服务端通过 User 对象获取身份。
步骤 3:前端无需额外处理
// 浏览器自动附带 Cookie
const es = new EventSource('/api/sse/user-notifications');
es.onmessage = e => console.log(e.data);
5.3 实现资源级授权(防止越权访问)
即使用户已登录,也必须确保只能订阅自己的数据。
错误示例(危险!):
// ❌ 客户端可篡改 userId!
[HttpGet("notifications/{userId}")]
public async Task Notifications(string userId, ...) {
// 直接使用 userId → 用户 A 可监听用户 B 的通知!
}
正确做法:从 Claims 获取真实用户 ID
[Authorize]
[HttpGet("my-notifications")] // ← 不暴露 userId
public async Task MyNotifications(CancellationToken ct)
{
var currentUserId = User.FindFirstValue(ClaimTypes.NameIdentifier);
// 仅订阅当前用户的消息
var stream = _sseManager.SubscribeUser(currentUserId, ct);
// ... 写入响应流
}
🔒 原则:SSE 端点不应接受可被篡改的用户标识参数,而应从认证上下文中提取。
5.4 SPA / 移动端如何安全传递 Token?
对于使用 JWT 的单页应用(React/Vue)或移动 App,由于 EventSource 无法设置 Header,可采用以下安全替代方案:
方案一:通过 Cookie 传递 Token(推荐)
- 登录后,将 JWT 写入 HttpOnly + Secure Cookie。
var cookieOptions = new CookieOptions
{
HttpOnly = true,
Secure = true,
SameSite = SameSiteMode.Strict,
Expires = DateTime.UtcNow.AddHours(1)
};
Response.Cookies.Append("auth-token", token, cookieOptions);
- 在
Program.cs 中配置基于 Cookie 的 JWT 认证。
- SSE 端点正常使用
[Authorize]。
✅ 优点:Token 不暴露给 JavaScript,防 XSS 窃取;浏览器自动携带。
方案二:临时 Token(短期有效)
- 客户端先请求一个 短期 SSE Token(有效期 30 秒)。
POST /api/sse/token
Authorization: Bearer <long-lived-jwt>
→ 返回 { sseToken: "temp-abc123" }
- 用该 Token 建立 SSE 连接。
const es = new EventSource(`/api/sse/stream?token=${sseToken}`);
- 服务端验证
token 并映射到用户身份。
⚠️ 注意:Token 必须一次性使用 + 短期有效,防止重放。
5.5 防御 CSRF 攻击
SSE 本身是只读流,通常不执行状态变更操作,因此 CSRF 风险较低。但若 SSE 端点依赖 Cookie 认证,仍需防范:
- SameSite Cookie:设置
SameSite=Strict 或 Lax。
- Origin 检查(可选,但注意
EventSource 发起的请求不包含 Origin 头,故可能无效)。
5.6 防范 XSS 与数据泄露
SSE 推送的数据若直接插入 DOM,可能引发 XSS:
// ❌ 危险!
es.onmessage = e => {
document.getElementById('alerts').innerHTML = e.data; // 若 data 含 <script> 则执行
};
安全做法:
- 永远不要直接
innerHTML 用户可控内容。
- 使用
textContent 或安全库(如 DOMPurify)。
- 服务端对
Payload 进行 HTML 编码(双重保险)。
5.7 SSE vs SignalR 安全模型对比
| 特性 |
SSE |
SignalR |
| 认证方式 |
依赖 HTTP(Cookie/Query) |
支持 Header、Query、Cookie |
| 加密 |
HTTPS(必须) |
HTTPS + 内置加密选项 |
| 细粒度授权 |
需手动实现 |
内置 Hub 方法级 [Authorize] |
| 连接标识 |
无原生支持 |
Context.UserIdentifier |
| 安全成熟度 |
低(需自行加固) |
高(微软官方维护) |
✅ 建议:若项目已用 SignalR,且需要双向通信,优先用 SignalR;若仅需轻量级单向推送,SSE + 严格安全措施足够。
5.8 本章小结
- 明确了 SSE 的认证限制:不支持自定义 Header
- 推荐使用 HttpOnly Cookie 传递身份凭据
- 强调从 Claims 获取用户 ID,禁止客户端传参
- 提供了 SPA 安全集成 JWT 的两种方案
- 防御了 CSRF、XSS 等风险
- 对比了 SSE 与 SignalR 的安全能力
第六章:SSE 性能调优与高并发实战
在前几章中,我们构建了功能完整、安全可靠的 SSE 系统。然而,当用户规模扩大到数千甚至数万并发连接时,基础实现将面临性能瓶颈:内存占用激增、GC 压力大、吞吐下降、连接超时等问题接踵而至。
本章目标:
✅ 使用 PipeWriter 和 IAsyncEnumerable 实现零分配流式推送
✅ 优化 Kestrel 服务器配置以支持高并发 SSE 连接
✅ 基于 Redis Pub/Sub 实现多实例水平扩展
✅ 使用 k6 对 SSE 接口进行压测与指标分析
✅ 设计连接限流与熔断机制防止服务雪崩
6.1 性能瓶颈分析:为什么基础 SSE 会慢?
回顾第二章的基础实现:
var writer = new StreamWriter(Response.Body) { AutoFlush = true };
await writer.WriteLineAsync($"data: {json}\n\n");
问题在于:
StreamWriter 内部使用缓冲区 + 编码转换,每次写入涉及字符串拼接、UTF-8 转换、缓冲区拷贝。
- 频繁 GC:每条消息生成临时字符串(
$"data: {json}\n\n")。
- 同步阻塞风险:虽为异步,但底层仍可能触发同步 I/O。
📊 实测数据(单核):
- 基础实现:约 3,000 并发连接,CPU 80%,吞吐 ~5k msg/s
- 优化后:10,000+ 并发,CPU 50%,吞吐 >20k msg/s
6.2 使用 PipeWriter 实现高性能写入(零分配)
System.IO.Pipelines 是 .NET 高性能 I/O 的基石,Kestrel 内部也基于它。
优化后的 SSE 写入方法:
private static async Task WriteSseMessageAsync(PipeWriter writer, long id, string eventType, string payload)
{
// 预分配足够空间(避免多次扩容)
var buffer = writer.GetSpan(256);
var pos = 0;
// 写入 id:
"id: ".AsSpan().CopyTo(buffer.Slice(pos));
pos += 4;
pos += Utf8Formatter.TryFormat(id, buffer.Slice(pos), out _);
buffer[pos++] = (byte)'\n';
// 写入 event:
"event: ".AsSpan().CopyTo(buffer.Slice(pos));
pos += 7;
eventType.AsSpan().CopyTo(buffer.Slice(pos));
pos += eventType.Length;
buffer[pos++] = (byte)'\n';
// 写入 data:
"data: ".AsSpan().CopyTo(buffer.Slice(pos));
pos += 6;
payload.AsSpan().CopyTo(buffer.Slice(pos));
pos += payload.Length;
buffer[pos++] = (byte)'\n';
buffer[pos++] = (byte)'\n';
// 提交写入
writer.Advance(pos);
await writer.FlushAsync();
}
✅ 优势:
- 无字符串拼接:直接操作字节 Span。
- 无 GC 压力:避免临时对象。
- 批量写入:减少 syscall 次数。
在控制器中使用:
[HttpGet("high-perf")]
public async Task HighPerfSse(CancellationToken ct)
{
Response.ContentType = "text/event-stream";
var pipeWriter = Response.BodyWriter; // ASP.NET Core 3.0+ 支持
while (!ct.IsCancellationRequested)
{
var payload = JsonSerializer.SerializeToUtf8Bytes(new { time = DateTime.UtcNow });
var payloadStr = Encoding.UTF8.GetString(payload); // 若需字符串(可进一步优化)
await WriteSseMessageAsync(pipeWriter, eventId++, "tick", payloadStr);
await Task.Delay(1000, ct);
}
}
🔧 进阶:若 payload 已是 UTF-8 字节,可直接写入 PipeWriter,完全绕过字符串。
6.3 Kestrel 高并发配置优化
默认 Kestrel 配置不适合长连接场景,需调整:
// Program.cs
builder.WebHost.ConfigureKestrel(options =>
{
options.Limits.MaxConcurrentConnections = 20_000; // 默认无限制,但受系统约束
options.Limits.MaxRequestBodySize = null; // SSE 无请求体
options.Limits.MinRequestBodyDataRate = null; // 禁用低速检测(长连接会触发)
options.Limits.MinResponseDataRate = null; // 同上
// 调整超时(SSE 连接可长期保持)
options.Limits.KeepAliveTimeout = TimeSpan.FromMinutes(5);
options.Limits.RequestHeadersTimeout = TimeSpan.FromSeconds(30);
});
⚠️ 注意:MinResponseDataRate 默认为 240 bytes/5sec,若 SSE 推送间隔 >5 秒,连接会被 Kestrel 主动关闭!
6.4 多实例部署:Redis Pub/Sub 实现连接共享
单机 SSE 无法水平扩展。当部署多个 ASP.NET Core 实例时,用户连接可能分散在不同节点,导致消息无法送达。
解决方案:使用 Redis 作为消息总线。
实现步骤:
- 发布消息到 Redis
public async Task PublishNotification(string userId, object payload)
{
var message = JsonSerializer.Serialize(new { UserId = userId, Payload = payload });
await _redis.PublishAsync("sse-notifications", message);
}
- 每个实例订阅 Redis 并转发给本地连接
public class RedisSseForwarder : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
var subscriber = _redis.GetSubscriber();
await subscriber.SubscribeAsync("sse-notifications", async (channel, value) =>
{
var msg = JsonSerializer.Deserialize<Notification>(value);
_connectionManager.NotifyUser(msg.UserId, msg.Payload);
});
}
}
- 注册后台服务
builder.Services.AddHostedService<RedisSseForwarder>();
✅ 优点:解耦生产者与消费者,支持任意实例数扩展,是云原生/IaaS架构中常见的解耦模式。
⚠️ 注意:Redis 成为单点,需部署集群 + 持久化。
6.5 连接限流与熔断
防止恶意用户或 Bug 导致连接耗尽:
方案一:基于 IP 的连接数限制(中间件示例)
app.Use(async (ctx, next) =>
{
var ip = ctx.Connection.RemoteIpAddress?.ToString();
if (_connectionCounter.Get(ip) > 10) // 每 IP 最多 10 个 SSE 连接
{
ctx.Response.StatusCode = 429;
return;
}
_connectionCounter.Increment(ip);
ctx.Response.OnCompleted(() => _connectionCounter.Decrement(ip));
await next();
});
6.6 本章小结
- 使用
PipeWriter + Span 实现零分配高性能写入。
- 调整 Kestrel 配置适配长连接场景。
- 借助 Redis Pub/Sub 实现多实例水平扩展。
- 掌握连接限流与熔断机制保障系统稳定。
第七章:SSE 生产环境部署与运维
开发和测试阶段的 SSE 实现,一旦进入生产环境,将面临网络代理、容器编排、连接保活、可观测性等全新挑战。一个未正确配置的反向代理可能在 30 秒后关闭“空闲”连接;缺乏监控则无法及时发现连接泄漏。
本章目标:
✅ 正确配置 Nginx 反向代理以支持长连接 SSE
✅ 在 Docker 中部署 SSE 服务
✅ 配置健康探针
✅ 集成 OpenTelemetry 实现指标采集
✅ 使用 Prometheus + Grafana 监控连接数与消息吞吐
7.1 反向代理配置:Nginx 示例
Nginx 默认对“无数据传输”的连接有超时限制,必须显式调整:
location /api/sse/ {
proxy_pass http://backend;
# 关键:禁用代理缓冲(否则消息会堆积不下发)
proxy_buffering off;
# 设置足够长的超时(根据业务需求)
proxy_read_timeout 24h; # 后端响应超时(SSE 是长响应)
proxy_send_timeout 24h; # 向客户端发送超时
# 透传连接头
proxy_set_header Connection '';
proxy_http_version 1.1;
# 保留原始 Host 和 IP
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
⚠️ 常见错误:
- 忘记
proxy_buffering off → 消息延迟数秒甚至分钟才到达前端。
proxy_read_timeout 过短(默认 60s) → 连接被 Nginx 主动关闭。
7.2 Docker 部署与健康检查
Dockerfile(优化启动速度)
FROM mcr.microsoft.com/dotnet/aspnet:10.0 AS runtime
WORKDIR /app
COPY --from=build /app/publish .
# 健康检查端点
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD curl -f http://localhost:80/health || exit 1
ENTRYPOINT ["dotnet", "MySseApp.dll"]
健康检查端点设计
实现两个独立的健康端点:
// Program.cs
app.MapGet("/health/live", () => Results.Ok("Alive")); // 仅检查进程是否运行
app.MapGet("/health/ready", async (IConnectionManager mgr) =>
{
// 检查关键依赖:Redis、DB 是否可达
if (await mgr.IsRedisConnectedAsync())
return Results.Ok("Ready");
else
return Results.StatusCode(503);
});
📌 /live 用于判断是否重启容器;/ready 用于判断是否接收流量。
7.3 可观测性:OpenTelemetry 集成
配置遥测管道
// Program.cs
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService("sse-service"))
.WithMetrics(metrics =>
{
metrics
.AddAspNetCoreInstrumentation()
.AddMeter("SseMetrics") // 自定义指标
.AddPrometheusExporter(); // 暴露 /metrics 端点
});
自定义 SSE 指标(连接数、消息速率)
public class SseMetrics
{
private static readonly Meter Meter = new("SseMetrics");
private static readonly UpDownCounter<int> ActiveConnections =
Meter.CreateUpDownCounter<int>("sse.active_connections");
private static readonly Counter<long> MessagesSent =
Meter.CreateCounter<long>("sse.messages_sent");
public static void IncrementConnections() => ActiveConnections.Add(1);
public static void DecrementConnections() => ActiveConnections.Add(-1);
public static void RecordMessageSent() => MessagesSent.Add(1);
}
在 SSE 控制器中调用这些方法以记录指标。
7.4 Prometheus + Grafana 监控看板
通过 PrometheusExporter 暴露指标后,可在 Grafana 中创建看板,监控关键指标:
sse_active_connections:当前连接数(按实例分组)。
sse_messages_sent_rate:消息发送速率(msg/s)。
process_working_set_bytes:内存使用量。
📈 告警规则示例(Prometheus):
- alert: HighSseConnectionCount
expr: sum(sse_active_connections) > 10000
for: 5m
labels:
severity: warning
7.5 本章小结
- 正确配置了 Nginx 以支持 SSE 长连接。
- 提供了 Docker 部署最佳实践与健康检查配置。
- 实现了基于 OpenTelemetry 的可观测性。
- 构建了 Prometheus + Grafana 监控体系。
- 这些运维/DevOps实践是保障生产环境 SSE 服务稳定的关键。
第八章:完整实战项目 —— 实时订单监控看板
本章将整合前七章的所有核心知识,从零构建一个生产就绪的实时订单监控系统。该系统模拟电商平台后台,运营人员可通过 Web 看板实时查看新订单、支付状态变更、发货进度等事件。
🎯 项目目标:
- 后端:ASP.NET Core 10 + Redis(用于多实例扩展)
- 前端:React 18 + TypeScript
- 安全:JWT Cookie 认证
- 运维:Docker 容器化 + Nginx + 监控
- 特性:自动重连、消息去重、离线回溯、连接限流
8.1 项目架构概览
[React Frontend]
↓ (EventSource)
[Nginx] → [SSE Service (ASP.NET Core)] ←→ [Order API (模拟)]
↑
[Redis Pub/Sub]
↑
[Other Services (e.g., Payment)]
8.2 后端核心实现(ASP.NET Core 10)
1. 用户认证(基于 JWT Cookie)
builder.Services.AddAuthentication("jwt-cookie")
.AddCookie("jwt-cookie", options =>
{
options.Cookie.HttpOnly = true;
options.Cookie.SecurePolicy = CookieSecurePolicy.Always;
options.Cookie.SameSite = SameSiteMode.Strict;
options.Events.OnValidatePrincipal = async ctx =>
{
var token = ctx.Request.Cookies["auth-token"];
if (JwtHelper.Validate(token, out var claims))
ctx.Principal = new ClaimsPrincipal(new ClaimsIdentity(claims, "jwt"));
};
});
2. SSE 控制器(带授权与限流)
[Authorize]
[ApiController]
[Route("api/[controller]")]
public class SseController : ControllerBase
{
[HttpGet("orders")]
public async Task OrdersStream(CancellationToken ct)
{
var userId = User.FindFirstValue(ClaimTypes.NameIdentifier);
if (string.IsNullOrEmpty(userId)) { Response.StatusCode = 401; return; }
// 连接限流(每用户最多 2 个连接)
if (!_connMgr.TryAddConnection(userId))
{
Response.StatusCode = 429;
await Response.WriteAsync("Too many connections");
return;
}
Response.ContentType = "text/event-stream";
Response.Headers.Append("Cache-Control", "no-cache");
Response.Headers.Append("X-Accel-Buffering", "no"); // Nginx 优化
var lastId = Request.Query["lastId"];
_logger.LogInformation("User {UserId} connected to SSE (lastId: {LastId})", userId, lastId);
try
{
await _connMgr.StreamUserOrdersAsync(userId, lastId, Response.BodyWriter, ct);
}
finally
{
_connMgr.RemoveConnection(userId);
_logger.LogInformation("User {UserId} disconnected from SSE", userId);
}
}
}
3. 连接管理器(支持 Redis 订阅与历史消息)
public class ConnectionManager : BackgroundService
{
private readonly Dictionary<string, List<(PipeWriter writer, string lastId)>> _connections = new();
private readonly ISubscriber _redis;
protected override async Task ExecuteAsync(CancellationToken ct)
{
await _redis.SubscribeAsync("order-events", async (channel, payload) =>
{
var ev = JsonSerializer.Deserialize<OrderEvent>(payload);
await NotifyUser(ev.UserId, ev);
});
}
public async Task StreamUserOrdersAsync(string userId, string lastId, PipeWriter writer, CancellationToken ct)
{
lock (_connections)
{
if (!_connections.ContainsKey(userId))
_connections[userId] = new();
_connections[userId].Add((writer, lastId));
}
// 发送历史消息(最多 10 条)
var history = GetHistoricalEvents(userId, lastId).Take(10);
foreach (var ev in history)
{
await WriteSseMessageAsync(writer, ev.Id, "order", JsonSerializer.Serialize(ev), ct);
}
// 保持连接
while (!ct.IsCancellationRequested)
{
await Task.Delay(1000, ct); // 心跳维持(可选)
}
}
}
8.3 前端实现(React + TypeScript)
封装 useOrderSse Hook:
// hooks/useOrderSse.ts
export const useOrderSse = (onOrder: (order: Order) => void) => {
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
const lastId = localStorage.getItem('lastOrderId') || '';
const url = `/api/sse/orders?lastId=${encodeURIComponent(lastId)}`;
const es = new EventSource(url, { withCredentials: true }); // 携带 Cookie
es.onmessage = (e) => {
const order = JSON.parse(e.data) as Order;
localStorage.setItem('lastOrderId', order.id); // 缓存 ID 用于回溯
onOrder(order);
setIsConnected(true);
};
es.onerror = () => setIsConnected(false);
return () => es.close();
}, []);
return { isConnected };
};
8.4 部署与监控
Docker Compose 部署
version: '3.8'
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
sse-service:
build: ./SseService
ports: ["5000:80"]
environment:
- ASPNETCORE_ENVIRONMENT=Production
depends_on: [redis]
nginx:
image: nginx:alpine
ports: ["80:80"]
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on: [sse-service]
8.5 本章小结
通过这个实战项目,我们综合运用了 SSE 实现中的各项关键技术:从基础推送、广播架构、可靠性保障、安全性、性能优化到最终的生产环境部署与监控,构建了一个完整、健壮、可扩展的实时应用系统。