如何设计一套真正能落地的分布式即时通讯系统?本文不只讨论“能跑起来”的 Demo,而是从连接接入、消息路由、存储模型、一致性语义、群聊扇出、限流熔断、可观测性、容灾与工程化交付等维度,完整拆解一套可支撑百万长连接、亿级日消息量的生产级 IM 架构。
一、为什么 IM 系统难做
即时通讯系统表面上只是“发消息”,本质上却是一个典型的高并发分布式系统问题:
- 它同时具备 海量长连接、高频小包、状态敏感、强交互实时性 的特征。
- 它要求消息在绝大多数场景下“看起来可靠、有序、及时”,但底层网络、服务实例、存储系统、消息队列都天然存在不确定性。
- 它既要满足单聊这种相对简单的点对点通信,也要处理万人群、离线消息、多端同步、撤回、已读回执、历史漫游、推送补偿等复杂业务。
很多文章把 IM 架构讲成了“WebSocket + Redis + Kafka + MySQL”的技术拼盘,但真正的难点从来不是组件名词,而是:
- 消息语义怎么定义
- 顺序边界怎么保证
- 路由状态如何维护
- 大群消息如何扇出
- 慢连接如何隔离
- 多端登录怎么同步
- 扩缩容和故障切换时怎么不雪崩
- 如何把这些能力工程化并稳定运行
这篇文章会围绕这些核心问题展开。
二、业务背景与目标设定
假设我们需要为一套企业协同 SaaS 平台构建 IM 能力,业务约束如下:
| 指标 |
目标 |
| DAU |
500 万 |
| 峰值在线 |
120 万 |
| 单日消息量 |
2 亿 |
| 峰值吞吐 |
30 万条/秒 |
| 消息实时性 |
单聊 P99 < 300ms,群聊 P99 < 800ms |
| 登录终端 |
Web / iOS / Android / 桌面端 |
| 可靠性目标 |
不丢消息,允许有限重复,可恢复 |
| 可用性目标 |
核心链路 99.95%+ |
从业务上看,系统至少要支持:
- 单聊
- 群聊
- 离线消息
- 多端同步
- 消息已读/未读
- 消息撤回
- 在线状态
- 历史消息查询
- 图片/文件消息
- 推送补偿
这决定了系统不能只追求吞吐,还必须把一致性语义说清楚。
三、先定义语义,再设计架构
在 IM 系统里,如果一开始不定义消息语义,后面所有设计都会变形。
3.1 必须明确的四个问题
1. 消息是否绝对不重复
答案通常是:做不到,也没必要。
工程上更合理的目标是:
- 服务端提供 至少一次投递
- 客户端和存储侧提供 幂等去重
- 用户体验上表现为“消息最终只展示一次”
这比盲目追求“精确一次”更现实,也更符合大规模分布式系统实践。
2. 顺序在哪个范围内保证
IM 中通常不追求全局顺序,而是保证:
- 单聊会话内有序
- 群聊在同一会话维度尽量有序
- 跨会话无需有序
换句话说,顺序保证的最小粒度是 conversationId,而不是整个系统。
3. 消息写入和消息投递谁先谁后
推荐原则:
- 先落库/落日志,再投递
- 如果在线投递失败,依然可以依赖离线拉取恢复
这样系统的“真相源”是存储和日志,而不是连接层内存。
4. 客户端以什么为准恢复消息
不是按时间戳,而是按 会话游标 cursor / seq 恢复。时间戳会受时钟漂移影响,游标才适合作为可靠恢复基准。
3.2 推荐的消息语义模型
对大多数企业 IM,推荐采用下面的语义定义:
- 发送语义:客户端发送成功,表示服务端已接收入队,不代表对端已收到
- 存储语义:服务端先持久化消息,再进行异步在线投递
- 投递语义:至少一次投递
- 展示语义:客户端按
msgId 幂等去重
- 顺序语义:会话内按
seq 单调递增展示
- 恢复语义:客户端断线重连后按最后确认的
seq 拉取增量消息
把这些规则先定下来,后续架构设计才有稳定边界。
四、生产级分布式 IM 总体架构
4.1 总体分层
(此处省略架构图,后续用表格说明各层职责。)
4.2 各层职责
| 层级 |
核心组件 |
职责 |
| 接入层 |
Connection Gateway |
维持 WebSocket 长连接、认证、心跳、限流 |
| 核心业务层 |
Message / Group / Session / Presence |
消息写入、会话序列生成、群成员解析、状态同步 |
| 事件层 |
Kafka |
流量削峰、异步投递、分区顺序、失败重试 |
| 状态层 |
Redis Cluster |
在线状态、连接路由、热点会话缓存 |
| 持久化层 |
MySQL / TiDB |
消息、会话游标、成员关系、回执信息 |
| 检索与审计 |
ES / ClickHouse |
全文搜索、运营分析、审计追踪 |
| 附件存储 |
MinIO / S3 |
图片、语音、文件 |
4.3 为什么要拆成“接入层 + 核心层”
原因非常关键:
- 连接层和业务层扩缩容诉求不同
- 连接层更接近网络栈
- 业务层更接近状态机
如果把连接、业务、存储都塞在一个服务里,早期看起来简单,后面几乎一定会在扩容、发布、故障隔离上付出巨大代价。
五、关键架构原理拆解
5.1 连接管理:如何维护百万长连接
长连接系统的第一个核心问题是:如何在分布式集群中准确找到用户当前连接在哪台机器上。
路由模型
推荐采用两级路由:
- 本地连接池
- 每个 Connection Gateway 维护本地
connId -> Connection 映射
- 全局路由表
- Redis 中维护
userId -> routeInfo
示例:
route:user:1001 -> {
"gatewayId": "conn-gw-12",
"connId": "c-8fa1b2",
"deviceId": "ios-001",
"lastActiveAt": 1715750000
}
为什么 Redis 路由表要带 TTL
因为连接是易失状态,网关异常退出时不一定有机会主动清理路由。TTL 可以兜底,避免脏路由长期存在。
典型做法:
- 心跳间隔:10 秒
- 路由 TTL:30 秒
- 每次收到客户端心跳时刷新 TTL
多端登录怎么处理
这取决于业务策略:
- 单端在线:新连接顶掉旧连接
- 多端在线:
userId -> deviceId -> routeInfo
- 同端单实例:同一个
deviceType 只允许一个活跃连接
生产上更常见的是:
因此路由模型最好从一开始就支持多设备维度。
5.2 消息链路:为什么不能“收到消息就直接推给对方”
因为那样消息只存在于内存,一旦服务异常就可能丢失。
生产上合理的主链路应该是:
Client A
-> Connection Gateway
-> Message Service
-> 分配会话序列号 seq
-> 持久化消息
-> 发送消息事件到 Kafka
-> Online Dispatcher 查找对端路由
-> 投递给目标 Gateway
-> Client B 收到消息
-> Client B ack
这条链路的核心思想是:
- 存储是事实来源
- 在线推送是加速路径
- 离线拉取是兜底路径
因此,即便在线投递失败,只要消息已经落库,客户端重连或主动同步后仍然能恢复。
5.3 顺序性:为什么要按会话分区
如果单聊消息的顺序被打乱,用户感知会非常明显。
推荐做法是:
- 以
conversationId 作为 Kafka 分区键
- 一个会话的所有消息进入同一分区
- 消费端按分区顺序处理
这样可以保证:
这也是 IM 系统中“局部有序、全局并行”的经典设计。
5.4 群聊扇出:系统真正的压力点
群聊的难点不在“存一条消息”,而在“把这条消息发给多少人”。
以一个 5 万人大群为例,一条群消息可能引发:
- 5 万条在线投递请求
- 5 万个未读计数更新
- 5 万个离线游标推进
- 若带推送,还会触发成千上万条 push 事件
如果处理不当,一个大群就足以把系统打穿。
群消息推荐架构
建议拆成两个阶段:
阶段一:消息入库
- 写一条群消息主记录
- 生成群会话
seq
- 发布
group_message_created 事件
阶段二:异步扇出
根据群规模走不同策略:
什么是写扩散、读扩散
写扩散
每个成员都生成一份收件箱或游标记录。
优点:
缺点:
读扩散
群消息只存一份,成员读取时按自己的阅读游标拉取。
优点:
缺点:
实战建议
- 500 人以下:写扩散
- 500 到 5000:混合扩散
- 5000 以上:读扩散
这是比较常见、也最符合成本收益比的工程策略。
5.5 在线状态:为什么 Presence 不应强一致
很多系统初期会执着于“用户在线状态必须绝对准确”,结果把状态系统做得非常重。
实际上 Presence 更适合采用 最终一致性:
- 心跳 10 秒刷新一次
- Redis TTL 30 秒
- 允许几秒钟误差
原因是:
- 在线状态本身只是辅助体验数据
- 过度追求强一致会引入过多同步开销
- 用户对“在线”状态本来就能接受短暂偏差
要把强一致预算留给真正关键的链路,比如消息写入和会话游标推进。
六、核心数据模型设计
6.1 核心实体
一套可演进的 IM 数据模型,至少应包含:
- 用户
user
- 会话
conversation
- 消息
message
- 会话成员
conversation_member
- 群组
group
- 群成员
group_member
- 已读游标
read_cursor
- 在线路由
route
- 投递回执
delivery_receipt
6.2 会话与消息表设计
conversation
CREATE TABLE conversation (
id BIGINT PRIMARY KEY,
biz_type TINYINT NOT NULL COMMENT '1=single,2=group',
owner_key VARCHAR(128) NOT NULL COMMENT '单聊去重键或群ID',
last_msg_id VARCHAR(64) DEFAULT NULL,
last_seq BIGINT NOT NULL DEFAULT 0,
last_message_at BIGINT NOT NULL DEFAULT 0,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
UNIQUE KEY uk_owner_key (biz_type, owner_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
message
CREATE TABLE message (
id BIGINT PRIMARY KEY,
msg_id VARCHAR(64) NOT NULL,
conversation_id BIGINT NOT NULL,
seq BIGINT NOT NULL,
sender_id BIGINT NOT NULL,
receiver_id BIGINT DEFAULT NULL,
group_id BIGINT DEFAULT NULL,
msg_type TINYINT NOT NULL,
payload JSON NOT NULL,
client_msg_id VARCHAR(64) DEFAULT NULL,
send_time BIGINT NOT NULL,
server_time BIGINT NOT NULL,
status TINYINT NOT NULL DEFAULT 1,
ext JSON DEFAULT NULL,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
UNIQUE KEY uk_msg_id (msg_id),
UNIQUE KEY uk_conv_seq (conversation_id, seq),
KEY idx_conv_time (conversation_id, server_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
read_cursor
CREATE TABLE read_cursor (
id BIGINT PRIMARY KEY,
user_id BIGINT NOT NULL,
conversation_id BIGINT NOT NULL,
read_seq BIGINT NOT NULL DEFAULT 0,
updated_at BIGINT NOT NULL,
UNIQUE KEY uk_user_conv (user_id, conversation_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
为什么需要 (conversation_id, seq) 唯一索引
因为 seq 是会话级顺序锚点,后续:
都会围绕它展开。
6.3 Redis 键设计
route:user:{userId}:{deviceId} -> routeInfo(JSON), TTL=30s
presence:user:{userId} -> online/offline, TTL=30s
conv:last_seq:{conversationId} -> current seq
group:members:{groupId} -> set(userId)
inbox:dirty:{userId} -> 待同步会话集合
dedup:client_msg:{clientMsgId} -> msgId, TTL=24h
Redis 里哪些是缓存,哪些是状态
这是工程上很容易混淆的一点。
group:members 属于缓存,可丢失可重建
route 和 presence 属于短期状态,丢失后可由心跳恢复
conv:last_seq 如果只存在 Redis 就危险,必须有数据库或日志兜底
dedup 是性能加速层,最终仍应依赖数据库唯一键做强兜底
七、生产级消息发送流程设计
7.1 单聊发送时序
1. 客户端发送 SendMessage(clientMsgId, toUserId, payload)
2. Gateway 做鉴权、限流、协议校验
3. Message Service 根据 userA-userB 定位或创建 conversation
4. 获取 conversation 下一个 seq
5. 写 message 表
6. 更新 conversation.last_seq / last_msg_id
7. 发送 message_created 事件到 Kafka
8. 返回发送成功给客户端(包含 msgId、seq、serverTime)
9. Dispatcher 异步查路由进行在线投递
10. 接收端 ack 后更新 delivery/read 状态
关键原则
- 客户端拿到发送成功回执时,消息必须已持久化
- 在线投递不应该阻塞发送成功回包
- 客户端消息状态应区分 sent / delivered / read
7.2 单聊发送的生产级 Go 示例
下面这段代码不是“玩具代码”,而是更接近生产实践的写法,体现了幂等、防重、会话顺序、事件投递四个关键点。
package message
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"time"
)
type SendMessageCmd struct {
SenderID int64
ReceiverID int64
ClientMsgID string
MsgType int32
Payload map[string]any
}
type SendMessageResp struct {
MsgID string
Seq int64
ServerTime int64
}
type ConversationRepo interface {
FindOrCreateSingle(ctx context.Context, tx *sql.Tx, senderID, receiverID int64) (Conversation, error)
NextSeq(ctx context.Context, tx *sql.Tx, conversationID int64) (int64, error)
UpdateLastMessage(ctx context.Context, tx *sql.Tx, conversationID int64, msgID string, seq int64, ts int64) error
}
type MessageRepo interface {
FindByClientMsgID(ctx context.Context, senderID int64, clientMsgID string) (*Message, error)
Insert(ctx context.Context, tx *sql.Tx, msg Message) error
}
type DedupStore interface {
Get(ctx context.Context, key string) (string, error)
SetNX(ctx context.Context, key, value string, ttl time.Duration) (bool, error)
}
type EventPublisher interface {
PublishMessageCreated(ctx context.Context, evt MessageCreatedEvent) error
}
type TxManager interface {
WithTx(ctx context.Context, fn func(tx *sql.Tx) error) error
}
type Service struct {
txm TxManager
convRepo ConversationRepo
msgRepo MessageRepo
dedup DedupStore
publisher EventPublisher
idgen func() string
now func() time.Time
}
func (s *Service) SendSingleMessage(ctx context.Context, cmd SendMessageCmd) (*SendMessageResp, error) {
if cmd.ClientMsgID == "" {
return nil, errors.New("clientMsgId is required")
}
if cmd.SenderID == 0 || cmd.ReceiverID == 0 {
return nil, errors.New("sender or receiver is invalid")
}
dedupKey := fmt.Sprintf("dedup:client_msg:%d:%s", cmd.SenderID, cmd.ClientMsgID)
if msgID, err := s.dedup.Get(ctx, dedupKey); err == nil && msgID != "" {
oldMsg, findErr := s.msgRepo.FindByClientMsgID(ctx, cmd.SenderID, cmd.ClientMsgID)
if findErr == nil && oldMsg != nil {
return &SendMessageResp{
MsgID: oldMsg.MsgID,
Seq: oldMsg.Seq,
ServerTime: oldMsg.ServerTime,
}, nil
}
}
serverTime := s.now().UnixMilli()
msgID := s.idgen()
reserved, err := s.dedup.SetNX(ctx, dedupKey, msgID, 24*time.Hour)
if err != nil {
return nil, err
}
if !reserved {
oldMsg, findErr := s.msgRepo.FindByClientMsgID(ctx, cmd.SenderID, cmd.ClientMsgID)
if findErr == nil && oldMsg != nil {
return &SendMessageResp{
MsgID: oldMsg.MsgID,
Seq: oldMsg.Seq,
ServerTime: oldMsg.ServerTime,
}, nil
}
return nil, errors.New("duplicated request")
}
var resp SendMessageResp
payloadBytes, err := json.Marshal(cmd.Payload)
if err != nil {
return nil, err
}
err = s.txm.WithTx(ctx, func(tx *sql.Tx) error {
conv, err := s.convRepo.FindOrCreateSingle(ctx, tx, cmd.SenderID, cmd.ReceiverID)
if err != nil {
return err
}
seq, err := s.convRepo.NextSeq(ctx, tx, conv.ID)
if err != nil {
return err
}
msg := Message{
MsgID: msgID,
ConversationID: conv.ID,
Seq: seq,
SenderID: cmd.SenderID,
ReceiverID: sql.NullInt64{Int64: cmd.ReceiverID, Valid: true},
MsgType: cmd.MsgType,
Payload: payloadBytes,
ClientMsgID: sql.NullString{String: cmd.ClientMsgID, Valid: true},
SendTime: serverTime,
ServerTime: serverTime,
Status: 1,
CreatedAt: serverTime,
UpdatedAt: serverTime,
}
if err := s.msgRepo.Insert(ctx, tx, msg); err != nil {
return err
}
if err := s.convRepo.UpdateLastMessage(ctx, tx, conv.ID, msgID, seq, serverTime); err != nil {
return err
}
resp = SendMessageResp{
MsgID: msgID,
Seq: seq,
ServerTime: serverTime,
}
return nil
})
if err != nil {
return nil, err
}
evt := MessageCreatedEvent{
MsgID: resp.MsgID,
SenderID: cmd.SenderID,
ReceiverID: cmd.ReceiverID,
Seq: resp.Seq,
ServerTime: resp.ServerTime,
}
if err := s.publisher.PublishMessageCreated(ctx, evt); err != nil {
return nil, err
}
return &resp, nil
}
这段代码体现了几个生产级关键点:
- 使用
clientMsgId 做客户端重试幂等
- Redis 只做第一层去重,数据库仍是最终兜底
seq 在事务内分配,避免会话内乱序
- 发送成功回包发生在消息持久化之后
- 在线投递由异步事件承担,而非同步直推
7.3 群聊消息处理流程
群聊不能直接复用单聊逻辑,否则一旦群规模上来就会出问题。
推荐流程如下:
1. 校验发送者是否为群成员
2. 获取 groupConversationId
3. 申请群会话 seq
4. 写入群消息主表
5. 投递 group_message_created 事件
6. Fanout Worker 根据群规模选择策略:
- 小群:批量写入成员 inbox
- 大群:仅更新群游标,在线用户实时推送
7. 在线用户收到推送,离线用户后续增量同步
为什么群成员列表不能每次都查数据库
因为群聊是典型热点路径。
生产上通常这样做:
- MySQL 存权威成员关系
- Redis 缓存热点群成员集
- 本地缓存保存超热点群的短期快照
- 群成员变更时,通过事件广播失效缓存
八、连接层工程化设计
8.1 Gateway 不只是“收发 WebSocket”
一个成熟的 Connection Gateway 至少要承担:
- Token 鉴权
- 协议版本协商
- 心跳维持
- 连接级限流
- 包体大小校验
- 非法请求拦截
- 慢连接隔离
- 写队列背压
- 路由注册与续约
如果只是简单调用 ReadMessage / WriteMessage,在真实流量下很快就会暴露问题。
8.2 生产级连接对象设计
package gateway
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
)
type Connection struct {
userID int64
deviceID string
connID string
ws *websocket.Conn
writeCh chan []byte
closed atomic.Bool
lastHeartbeat atomic.Int64
closeOnce sync.Once
}
func NewConnection(userID int64, deviceID, connID string, ws *websocket.Conn) *Connection {
c := &Connection{
userID: userID,
deviceID: deviceID,
connID: connID,
ws: ws,
writeCh: make(chan []byte, 1024),
}
c.lastHeartbeat.Store(time.Now().Unix())
return c
}
func (c *Connection) Start(ctx context.Context, onClose func()) {
go c.readLoop(ctx, onClose)
go c.writeLoop(onClose)
}
func (c *Connection) TouchHeartbeat() {
c.lastHeartbeat.Store(time.Now().Unix())
}
func (c *Connection) IsHeartbeatExpired(timeout time.Duration) bool {
last := time.Unix(c.lastHeartbeat.Load(), 0)
return time.Since(last) > timeout
}
func (c *Connection) AsyncWrite(data []byte) error {
if c.closed.Load() {
return ErrConnClosed
}
select {
case c.writeCh <- data:
return nil
default:
c.Close()
return ErrBackpressure
}
}
func (c *Connection) Close() {
c.closeOnce.Do(func() {
c.closed.Store(true)
close(c.writeCh)
_ = c.ws.Close()
})
}
func (c *Connection) readLoop(ctx context.Context, onClose func()) {
defer func() {
c.Close()
onClose()
}()
_ = c.ws.SetReadDeadline(time.Now().Add(70 * time.Second))
c.ws.SetPongHandler(func(string) error {
c.TouchHeartbeat()
return c.ws.SetReadDeadline(time.Now().Add(70 * time.Second))
})
for {
select {
case <-ctx.Done():
return
default:
_, data, err := c.ws.ReadMessage()
if err != nil {
return
}
c.TouchHeartbeat()
_ = handleInboundPacket(ctx, c, data)
}
}
}
func (c *Connection) writeLoop(onClose func()) {
defer func() {
c.Close()
onClose()
}()
ticker := time.NewTicker(20 * time.Second)
defer ticker.Stop()
for {
select {
case data, ok := <-c.writeCh:
if !ok {
return
}
_ = c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := c.ws.WriteMessage(websocket.BinaryMessage, data); err != nil {
return
}
case <-ticker.C:
_ = c.ws.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
这段代码解决了什么问题
- 使用独立
writeLoop 防止业务线程被慢连接阻塞
- 使用
writeCh 做连接级背压
- 利用
Ping/Pong + ReadDeadline 发现假死连接
CloseOnce 避免重复关闭造成 panic
- 读写解耦,利于后续插入压缩、批量合并、统计打点
8.3 Gateway 路由注册与续约
type RouteInfo struct {
GatewayID string `json:"gatewayId"`
ConnID string `json:"connId"`
UserID int64 `json:"userId"`
DeviceID string `json:"deviceId"`
ConnectTime int64 `json:"connectTime"`
LastActiveAt int64 `json:"lastActiveAt"`
}
func (s *RouteService) Register(ctx context.Context, route RouteInfo) error {
key := fmt.Sprintf("route:user:%d:%s", route.UserID, route.DeviceID)
buf, _ := json.Marshal(route)
return s.redis.Set(ctx, key, buf, 30*time.Second).Err()
}
func (s *RouteService) Renew(ctx context.Context, userID int64, deviceID string) error {
key := fmt.Sprintf("route:user:%d:%s", userID, deviceID)
return s.redis.Expire(ctx, key, 30*time.Second).Err()
}
工程建议:
- 路由刷新不要每个心跳都写完整 JSON,可只
EXPIRE
- 超大规模下可做路由分片,避免单实例 Redis 热点
- 路由 key 必须可从
userId + deviceId 直接推导,便于快速查找
九、高并发与可扩展设计
9.1 水平扩展的关键原则
要让 IM 系统真正可扩展,必须满足三点:
- 连接层尽量无状态
- 消息处理按会话分区
- 热点操作可异步、可削峰、可降级
这三点基本决定了系统是否能从“十万在线”长到“百万在线”。
9.2 消息队列分区策略
Kafka 的分区键建议这样设计:
- 单聊:
conversationId
- 群聊:
groupId 或 groupConversationId
- 回执:
conversationId
这样可以获得:
分区数如何估算
经验公式:
分区数 = 峰值吞吐 / 单分区稳定吞吐 * 冗余系数
如果单分区稳定处理 3000 msg/s,峰值 30 万 msg/s,则:
300000 / 3000 = 100
100 * 1.5 ≈ 150 分区
实际生产中通常会向上取整,例如 192 或 256 分区,便于后续扩容。
9.3 限流、背压与降级
分布式 IM 最怕的不是“平均负载高”,而是“突发洪峰”。
必须分层做保护:
连接层保护
- 单连接 QPS 限流
- 包体大小限制
- 鉴权失败快速断开
- 写队列满时断开慢连接
业务层保护
- 单用户发送频控
- 单群发言频控
- 大群消息异步化
- 敏感词/风控异步旁路
存储层保护
用户体验降级
系统压力过大时,优先保核心能力:
- 单聊消息发送与接收
- 小群消息
- 历史同步
- 在线状态
- 输入中、回执、已读、推送补偿等弱核心能力
这类降级策略需要在设计时就预埋,不能等故障发生后临时补。
9.4 大群专项治理
大群是 IM 的系统性风险点,建议单独治理:
- 大群分级:普通群、千人群、万人群
- 大群消息走专用 Topic
- 大群未读计数异步聚合
- 大群只对在线用户实时推送,离线用户读扩散拉取
- 大群成员缓存独立分片
- 大群支持发言频控、管理员模式、慢启动广播
这类“业务规则”实际上是架构稳定性的组成部分。
十、可靠性设计:不丢、不乱、可恢复
10.1 幂等设计
幂等至少要做三层:
- 客户端层
- 缓存层
- 数据库层
这三层缺一不可。
10.2 ACK 设计
建议把 ACK 拆成三类:
server_ack
deliver_ack
read_ack
这样客户端才能准确展示:
为什么不要把“写入成功”和“对端收到”混为一谈
因为这两个语义完全不同:
- 前者依赖服务端落库成功
- 后者依赖目标客户端在线且网络可达
混在一起会导致前端状态机混乱,也会让排障非常困难。
10.3 断线重连与离线同步
客户端断线后,不能简单“重新登录然后继续收消息”,必须基于游标补偿。
推荐流程:
1. 客户端重连并上报 deviceId、token、lastAckedSeqMap
2. 服务端鉴权并重建 route
3. Sync Service 根据各会话 read/sync cursor 拉取增量消息
4. 按 seq 顺序回放
5. 恢复在线推送
为什么恢复时要按会话游标,而不是全局消息 ID
因为实际系统中:
- 不同会话处理并发度不同
- 全局 ID 不适合表达会话内顺序
- 已读与未读统计天然基于会话
因此恢复以“会话游标”为中心是更稳妥的设计。
10.4 Outbox 模式避免“库写成功但事件丢失”
这是很多系统容易忽略的可靠性漏洞。
如果发送流程是:
- 事务提交数据库
- 再调用 Kafka 发送消息
那么在步骤 1 和步骤 2 之间服务崩溃,就会出现:
用户看到“发送成功”,对端却永远收不到在线消息。
推荐使用 Outbox Pattern:
CREATE TABLE message_outbox (
id BIGINT PRIMARY KEY,
event_type VARCHAR(64) NOT NULL,
aggregate_id BIGINT NOT NULL,
payload JSON NOT NULL,
status TINYINT NOT NULL DEFAULT 0,
retry_count INT NOT NULL DEFAULT 0,
next_retry_at BIGINT NOT NULL DEFAULT 0,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);
事务内同时写:
message
conversation
message_outbox
然后由独立 Outbox Worker 轮询并投递 Kafka,成功后再更新 outbox 状态。
这会让系统从“尽量可靠”提升到“可证明恢复”。
十一、可观测性与运维治理
11.1 监控指标设计
IM 系统不能只看 CPU、内存,核心是业务指标和链路指标。
接入层指标
- 当前在线连接数
- 新增连接数/断开连接数
- 心跳超时数
- 每连接平均入站/出站速率
- 写队列积压数
- 慢连接断开数
消息链路指标
- 消息发送 QPS
- 单聊/群聊占比
- Kafka 生产/消费延迟
- 在线投递成功率
- 离线同步耗时
- 消息端到端延迟 P50/P95/P99
存储与状态指标
- MySQL TPS / 慢 SQL
- Redis QPS / 命中率 / 热 key
- Outbox backlog
- DLQ 积压量
11.2 日志与 Trace
每条消息建议统一打通以下追踪字段:
traceId
msgId
clientMsgId
conversationId
seq
senderId
receiverId / groupId
gatewayId
这样在排查“消息丢了”“消息重复”“对端没收到”“顺序乱了”等问题时,才能从网关、业务、队列、存储一路串起来。
11.3 常见故障场景与处理
场景一:Gateway 节点重启,用户瞬时大规模掉线
解决:
- 客户端指数退避重连
- Gateway 发布前摘流
- 路由 TTL 自动失效
- Sync Service 重连后自动补拉增量消息
场景二:Redis 抖动导致路由查找失败
解决:
- Dispatcher 短暂重试
- 路由未命中则标记为离线投递
- 依赖离线同步补偿,而不是同步阻塞重试
场景三:Kafka 消费堆积
解决:
- 优先看热点群或某个异常分区
- 动态扩消费者
- 群聊与单聊拆 Topic
- 必要时关闭非核心事件消费
场景四:大群突发广播引发雪崩
解决:
- 大群专用 Topic
- 分批扇出
- 在线/离线分流
- push 补偿延后
- 未读异步聚合
十二、Kubernetes 部署与生产配置
12.1 Connection Gateway 部署示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: im-connection-gateway
labels:
app: im-connection-gateway
spec:
replicas: 20
selector:
matchLabels:
app: im-connection-gateway
template:
metadata:
labels:
app: im-connection-gateway
spec:
terminationGracePeriodSeconds: 60
containers:
- name: gateway
image: registry.example.com/im/connection-gateway:1.0.0
ports:
- name: ws
containerPort: 8000
- name: grpc
containerPort: 9000
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: GOMAXPROCS
value: "4"
- name: REDIS_ADDR
value: "redis-cluster:6379"
- name: KAFKA_BROKERS
value: "kafka-0:9092,kafka-1:9092,kafka-2:9092"
resources:
requests:
cpu: "2"
memory: "2Gi"
limits:
cpu: "4"
memory: "6Gi"
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 20
periodSeconds: 10
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- |
wget -qO- http://127.0.0.1:8000/drain && sleep 30
为什么需要 preStop + drain
因为连接服务不是普通无状态 HTTP 服务。
如果 Pod 被直接杀掉,会导致:
推荐流程是:
- 先摘除新连接流量
- 让旧连接在 20 到 30 秒内自然迁移
- 再关闭进程
12.2 HPA 扩缩容建议
连接层不要只按 CPU 扩容,建议综合:
例如:
- 单 Pod 连接数阈值 2 万
- CPU > 65%
- 出站消息 > 8000 msg/s
三者中任意满足即可扩容。
12.3 Nginx/Envoy 接入建议
典型要求:
- 支持四层或七层 WebSocket 代理
- 打开长连接超时
- 开启 TLS
- 支持源地址透传
- 可做接入层限流
Nginx 示例:
worker_processes auto;
events {
worker_connections 65535;
use epoll;
}
http {
upstream im_gateway {
least_conn;
server 10.0.0.11:8000 max_fails=3 fail_timeout=10s;
server 10.0.0.12:8000 max_fails=3 fail_timeout=10s;
server 10.0.0.13:8000 max_fails=3 fail_timeout=10s;
}
server {
listen 443 ssl http2;
server_name im.example.com;
ssl_certificate /etc/nginx/cert/server.crt;
ssl_certificate_key /etc/nginx/cert/server.key;
location /ws {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
proxy_pass http://im_gateway;
}
}
}
十三、真实业务场景演进案例
下面给出一个更贴近实际的架构演进路径。
13.1 第一阶段:单体版 IM
架构:
- 一个 WebSocket 服务
- 一个 MySQL
- 一个 Redis
容量:
问题:
13.2 第二阶段:接入层与业务层拆分
架构:
- Connection Gateway 独立部署
- Message Service 独立部署
- Redis 做全局路由
收益:
容量:
13.3 第三阶段:引入消息队列与异步化
架构:
- 增加 Kafka
- 在线投递异步化
- 引入 Outbox
- 引入群聊 Fanout Worker
收益:
容量:
13.4 第四阶段:大群治理与多机房
架构:
- 单聊/群聊拆 Topic
- 大群单独处理链路
- 多机房部署
- 就近接入 + 异地同步
收益:
13.5 一个典型的线上案例
某企业协同平台的实际现象:
- 日活 400 万
- 峰值在线 110 万
- 高峰期晚间 8 点出现群通知洪峰
- 前期群消息直接写扩散,导致大群消息写库放大严重
优化过程:
- 将 1000 人以上群聊改为读扩散
- 大群消息切换到专用 Topic
- 在线投递按 Gateway 聚合批量推送
- 未读数改为异步聚合,不在主链路实时逐人更新
- push 通知延后 3 到 5 秒,避免与在线扇出抢资源
最终结果:
- 峰值 CPU 从 85% 降到 52%
- MySQL 写 TPS 降低 40%+
- 群聊 P99 延迟从 1.6s 降到 620ms
- 滚动发布期间重连洪峰下降约 60%
这说明 IM 系统优化的关键,不只是“换更强的中间件”,更重要的是 让消息语义与资源模型匹配。
十四、容量规划与压测思路
14.1 单 Pod 连接容量估算
影响单 Pod 连接数的核心因素包括:
- 每连接内存占用
- goroutine 数量
- 写缓冲大小
- 心跳频率
- TLS 开销
粗略估算公式:
单连接内存 = websocket对象 + 读写缓冲 + 路由状态 + goroutine栈
如果压测后平均每连接占用 25KB,6Gi 可用内存下,理论上:
6 * 1024 * 1024 KB / 25 KB ≈ 245760 连接
但生产不能按理论极限跑,通常只取 20% 到 30% 作为安全水位。
因此更合理的单 Pod 容量可能是:
具体值必须靠压测确认。
14.2 压测要覆盖哪些维度
不要只压“建连成功数”,还要同时压:
- 建连速度
- 心跳稳定性
- 单聊吞吐
- 群聊扇出
- 断线重连
- 发布摘流
- Redis/Kafka 故障注入
推荐压测场景
- 10 万连接稳定 30 分钟
- 每秒 1 万条单聊消息持续 15 分钟
- 每分钟 100 条万人群消息
- 5% 客户端随机断线重连
- 滚动重启 20% Gateway 实例
没有这些压测,所谓“百万在线架构”大概率只是理论架构。
十五、常见设计误区
误区一:Redis 能扛住就说明架构没问题
错。Redis 只是状态缓存和加速层,不应该承担消息事实来源。
误区二:消息不重复等于消息可靠
错。IM 更重要的是不丢、可恢复,而不是绝不重复。
误区三:所有群都使用同一套扇出逻辑
错。群规模不同,最优策略完全不同。
误区四:在线状态必须精确到秒
错。Presence 适合最终一致,别把系统预算浪费在次要目标上。
误区五:上线前只做功能测试,不做故障演练
错。IM 的大部分事故都发生在扩缩容、网络抖动、队列堆积、热点群冲击等非功能场景。
十六、文章总结
一套真正能落地的生产级 IM 系统,核心不是“用了哪些中间件”,而是是否把以下问题设计清楚了:
- 消息语义
- 顺序边界
- 存储真相源
- 在线投递与离线恢复
- 大群扇出策略
- 连接背压与慢连接隔离
- 幂等、ACK、Outbox 等可靠性机制
- 限流、降级、可观测性与发布治理
如果用一句话概括 IM 系统架构的本质,那就是:
以消息持久化为真相源,以会话顺序为核心约束,以连接层实时投递为加速通道,以异步化与工程治理支撑高并发和持续演进。
当系统从 1 万在线走向 100 万在线时,真正决定成败的不是某一个组件的极限性能,而是整个链路是否具备:
- 明确的语义边界
- 稳定的可扩展模型
- 可恢复的故障处理机制
- 可持续演进的工程结构
这才是分布式 IM 系统从“能用”走向“生产级”的关键。
本文由云栈社区编辑整理,更多分布式架构实践内容可关注云栈社区。