找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3673

积分

0

好友

487

主题
发表于 1 小时前 | 查看: 2| 回复: 0

如何设计一套真正能落地的分布式即时通讯系统?本文不只讨论“能跑起来”的 Demo,而是从连接接入、消息路由、存储模型、一致性语义、群聊扇出、限流熔断、可观测性、容灾与工程化交付等维度,完整拆解一套可支撑百万长连接、亿级日消息量的生产级 IM 架构。

一、为什么 IM 系统难做

即时通讯系统表面上只是“发消息”,本质上却是一个典型的高并发分布式系统问题:

  • 它同时具备 海量长连接高频小包状态敏感强交互实时性 的特征。
  • 它要求消息在绝大多数场景下“看起来可靠、有序、及时”,但底层网络、服务实例、存储系统、消息队列都天然存在不确定性。
  • 它既要满足单聊这种相对简单的点对点通信,也要处理万人群、离线消息、多端同步、撤回、已读回执、历史漫游、推送补偿等复杂业务。

很多文章把 IM 架构讲成了“WebSocket + Redis + Kafka + MySQL”的技术拼盘,但真正的难点从来不是组件名词,而是:

  1. 消息语义怎么定义
  2. 顺序边界怎么保证
  3. 路由状态如何维护
  4. 大群消息如何扇出
  5. 慢连接如何隔离
  6. 多端登录怎么同步
  7. 扩缩容和故障切换时怎么不雪崩
  8. 如何把这些能力工程化并稳定运行

这篇文章会围绕这些核心问题展开。

二、业务背景与目标设定

假设我们需要为一套企业协同 SaaS 平台构建 IM 能力,业务约束如下:

指标 目标
DAU 500 万
峰值在线 120 万
单日消息量 2 亿
峰值吞吐 30 万条/秒
消息实时性 单聊 P99 < 300ms,群聊 P99 < 800ms
登录终端 Web / iOS / Android / 桌面端
可靠性目标 不丢消息,允许有限重复,可恢复
可用性目标 核心链路 99.95%+

从业务上看,系统至少要支持:

  • 单聊
  • 群聊
  • 离线消息
  • 多端同步
  • 消息已读/未读
  • 消息撤回
  • 在线状态
  • 历史消息查询
  • 图片/文件消息
  • 推送补偿

这决定了系统不能只追求吞吐,还必须把一致性语义说清楚。

三、先定义语义,再设计架构

在 IM 系统里,如果一开始不定义消息语义,后面所有设计都会变形。

3.1 必须明确的四个问题

1. 消息是否绝对不重复

答案通常是:做不到,也没必要。

工程上更合理的目标是:

  • 服务端提供 至少一次投递
  • 客户端和存储侧提供 幂等去重
  • 用户体验上表现为“消息最终只展示一次”

这比盲目追求“精确一次”更现实,也更符合大规模分布式系统实践。

2. 顺序在哪个范围内保证

IM 中通常不追求全局顺序,而是保证:

  • 单聊会话内有序
  • 群聊在同一会话维度尽量有序
  • 跨会话无需有序

换句话说,顺序保证的最小粒度是 conversationId,而不是整个系统。

3. 消息写入和消息投递谁先谁后

推荐原则:

  • 先落库/落日志,再投递
  • 如果在线投递失败,依然可以依赖离线拉取恢复

这样系统的“真相源”是存储和日志,而不是连接层内存。

4. 客户端以什么为准恢复消息

不是按时间戳,而是按 会话游标 cursor / seq 恢复。时间戳会受时钟漂移影响,游标才适合作为可靠恢复基准。

3.2 推荐的消息语义模型

对大多数企业 IM,推荐采用下面的语义定义:

  • 发送语义:客户端发送成功,表示服务端已接收入队,不代表对端已收到
  • 存储语义:服务端先持久化消息,再进行异步在线投递
  • 投递语义:至少一次投递
  • 展示语义:客户端按 msgId 幂等去重
  • 顺序语义:会话内按 seq 单调递增展示
  • 恢复语义:客户端断线重连后按最后确认的 seq 拉取增量消息

把这些规则先定下来,后续架构设计才有稳定边界。

四、生产级分布式 IM 总体架构

4.1 总体分层

(此处省略架构图,后续用表格说明各层职责。)

4.2 各层职责

层级 核心组件 职责
接入层 Connection Gateway 维持 WebSocket 长连接、认证、心跳、限流
核心业务层 Message / Group / Session / Presence 消息写入、会话序列生成、群成员解析、状态同步
事件层 Kafka 流量削峰、异步投递、分区顺序、失败重试
状态层 Redis Cluster 在线状态、连接路由、热点会话缓存
持久化层 MySQL / TiDB 消息、会话游标、成员关系、回执信息
检索与审计 ES / ClickHouse 全文搜索、运营分析、审计追踪
附件存储 MinIO / S3 图片、语音、文件

4.3 为什么要拆成“接入层 + 核心层”

原因非常关键:

  • 连接层和业务层扩缩容诉求不同
    • 连接层按在线连接数扩
    • 业务层按消息吞吐扩
  • 连接层更接近网络栈
    • 关注 fd、心跳、背压、慢连接
  • 业务层更接近状态机
    • 关注消息语义、序列号、一致性、群成员关系

如果把连接、业务、存储都塞在一个服务里,早期看起来简单,后面几乎一定会在扩容、发布、故障隔离上付出巨大代价。

五、关键架构原理拆解

5.1 连接管理:如何维护百万长连接

长连接系统的第一个核心问题是:如何在分布式集群中准确找到用户当前连接在哪台机器上。

路由模型

推荐采用两级路由:

  1. 本地连接池
    • 每个 Connection Gateway 维护本地 connId -> Connection 映射
  2. 全局路由表
    • Redis 中维护 userId -> routeInfo

示例:

route:user:1001 -> {
  "gatewayId": "conn-gw-12",
  "connId": "c-8fa1b2",
  "deviceId": "ios-001",
  "lastActiveAt": 1715750000
}

为什么 Redis 路由表要带 TTL

因为连接是易失状态,网关异常退出时不一定有机会主动清理路由。TTL 可以兜底,避免脏路由长期存在。

典型做法:

  • 心跳间隔:10 秒
  • 路由 TTL:30 秒
  • 每次收到客户端心跳时刷新 TTL

多端登录怎么处理

这取决于业务策略:

  • 单端在线:新连接顶掉旧连接
  • 多端在线:userId -> deviceId -> routeInfo
  • 同端单实例:同一个 deviceType 只允许一个活跃连接

生产上更常见的是:

  • 手机端单实例
  • Web 和桌面端允许并存

因此路由模型最好从一开始就支持多设备维度。

5.2 消息链路:为什么不能“收到消息就直接推给对方”

因为那样消息只存在于内存,一旦服务异常就可能丢失。

生产上合理的主链路应该是:

Client A
  -> Connection Gateway
  -> Message Service
  -> 分配会话序列号 seq
  -> 持久化消息
  -> 发送消息事件到 Kafka
  -> Online Dispatcher 查找对端路由
  -> 投递给目标 Gateway
  -> Client B 收到消息
  -> Client B ack

这条链路的核心思想是:

  • 存储是事实来源
  • 在线推送是加速路径
  • 离线拉取是兜底路径

因此,即便在线投递失败,只要消息已经落库,客户端重连或主动同步后仍然能恢复。

5.3 顺序性:为什么要按会话分区

如果单聊消息的顺序被打乱,用户感知会非常明显。

推荐做法是:

  • conversationId 作为 Kafka 分区键
  • 一个会话的所有消息进入同一分区
  • 消费端按分区顺序处理

这样可以保证:

  • 同一会话内消息天然有序
  • 不同会话之间可以并行处理

这也是 IM 系统中“局部有序、全局并行”的经典设计。

5.4 群聊扇出:系统真正的压力点

群聊的难点不在“存一条消息”,而在“把这条消息发给多少人”。

以一个 5 万人大群为例,一条群消息可能引发:

  • 5 万条在线投递请求
  • 5 万个未读计数更新
  • 5 万个离线游标推进
  • 若带推送,还会触发成千上万条 push 事件

如果处理不当,一个大群就足以把系统打穿。

群消息推荐架构

建议拆成两个阶段:

阶段一:消息入库
  • 写一条群消息主记录
  • 生成群会话 seq
  • 发布 group_message_created 事件
阶段二:异步扇出

根据群规模走不同策略:

  • 小群:写扩散
  • 大群:读扩散或混合扩散

什么是写扩散、读扩散

写扩散

每个成员都生成一份收件箱或游标记录。

优点:

  • 读取简单,适合高频读
  • 在线与离线统一

缺点:

  • 发消息成本高
  • 大群写放大严重
读扩散

群消息只存一份,成员读取时按自己的阅读游标拉取。

优点:

  • 写入成本低
  • 适合超大群

缺点:

  • 读取逻辑复杂
  • 未读数和历史拉取更难优化

实战建议

  • 500 人以下:写扩散
  • 500 到 5000:混合扩散
  • 5000 以上:读扩散

这是比较常见、也最符合成本收益比的工程策略。

5.5 在线状态:为什么 Presence 不应强一致

很多系统初期会执着于“用户在线状态必须绝对准确”,结果把状态系统做得非常重。

实际上 Presence 更适合采用 最终一致性

  • 心跳 10 秒刷新一次
  • Redis TTL 30 秒
  • 允许几秒钟误差

原因是:

  • 在线状态本身只是辅助体验数据
  • 过度追求强一致会引入过多同步开销
  • 用户对“在线”状态本来就能接受短暂偏差

要把强一致预算留给真正关键的链路,比如消息写入和会话游标推进。

六、核心数据模型设计

6.1 核心实体

一套可演进的 IM 数据模型,至少应包含:

  • 用户 user
  • 会话 conversation
  • 消息 message
  • 会话成员 conversation_member
  • 群组 group
  • 群成员 group_member
  • 已读游标 read_cursor
  • 在线路由 route
  • 投递回执 delivery_receipt

6.2 会话与消息表设计

conversation

CREATE TABLE conversation (
    id                BIGINT PRIMARY KEY,
    biz_type          TINYINT NOT NULL COMMENT '1=single,2=group',
    owner_key         VARCHAR(128) NOT NULL COMMENT '单聊去重键或群ID',
    last_msg_id       VARCHAR(64) DEFAULT NULL,
    last_seq          BIGINT NOT NULL DEFAULT 0,
    last_message_at   BIGINT NOT NULL DEFAULT 0,
    created_at        BIGINT NOT NULL,
    updated_at        BIGINT NOT NULL,
    UNIQUE KEY uk_owner_key (biz_type, owner_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

message

CREATE TABLE message (
    id                BIGINT PRIMARY KEY,
    msg_id            VARCHAR(64) NOT NULL,
    conversation_id   BIGINT NOT NULL,
    seq               BIGINT NOT NULL,
    sender_id         BIGINT NOT NULL,
    receiver_id       BIGINT DEFAULT NULL,
    group_id          BIGINT DEFAULT NULL,
    msg_type          TINYINT NOT NULL,
    payload           JSON NOT NULL,
    client_msg_id     VARCHAR(64) DEFAULT NULL,
    send_time         BIGINT NOT NULL,
    server_time       BIGINT NOT NULL,
    status            TINYINT NOT NULL DEFAULT 1,
    ext               JSON DEFAULT NULL,
    created_at        BIGINT NOT NULL,
    updated_at        BIGINT NOT NULL,
    UNIQUE KEY uk_msg_id (msg_id),
    UNIQUE KEY uk_conv_seq (conversation_id, seq),
    KEY idx_conv_time (conversation_id, server_time)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

read_cursor

CREATE TABLE read_cursor (
    id                BIGINT PRIMARY KEY,
    user_id           BIGINT NOT NULL,
    conversation_id   BIGINT NOT NULL,
    read_seq          BIGINT NOT NULL DEFAULT 0,
    updated_at        BIGINT NOT NULL,
    UNIQUE KEY uk_user_conv (user_id, conversation_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

为什么需要 (conversation_id, seq) 唯一索引

因为 seq 是会话级顺序锚点,后续:

  • 历史消息分页
  • 增量同步
  • 已读推进
  • 撤回与重放

都会围绕它展开。

6.3 Redis 键设计

route:user:{userId}:{deviceId}   -> routeInfo(JSON), TTL=30s
presence:user:{userId}           -> online/offline, TTL=30s
conv:last_seq:{conversationId}   -> current seq
group:members:{groupId}          -> set(userId)
inbox:dirty:{userId}             -> 待同步会话集合
dedup:client_msg:{clientMsgId}   -> msgId, TTL=24h

Redis 里哪些是缓存,哪些是状态

这是工程上很容易混淆的一点。

  • group:members 属于缓存,可丢失可重建
  • routepresence 属于短期状态,丢失后可由心跳恢复
  • conv:last_seq 如果只存在 Redis 就危险,必须有数据库或日志兜底
  • dedup 是性能加速层,最终仍应依赖数据库唯一键做强兜底

七、生产级消息发送流程设计

7.1 单聊发送时序

1. 客户端发送 SendMessage(clientMsgId, toUserId, payload)
2. Gateway 做鉴权、限流、协议校验
3. Message Service 根据 userA-userB 定位或创建 conversation
4. 获取 conversation 下一个 seq
5. 写 message 表
6. 更新 conversation.last_seq / last_msg_id
7. 发送 message_created 事件到 Kafka
8. 返回发送成功给客户端(包含 msgId、seq、serverTime)
9. Dispatcher 异步查路由进行在线投递
10. 接收端 ack 后更新 delivery/read 状态

关键原则

  • 客户端拿到发送成功回执时,消息必须已持久化
  • 在线投递不应该阻塞发送成功回包
  • 客户端消息状态应区分 sent / delivered / read

7.2 单聊发送的生产级 Go 示例

下面这段代码不是“玩具代码”,而是更接近生产实践的写法,体现了幂等、防重、会话顺序、事件投递四个关键点。

package message

import (
    "context"
    "database/sql"
    "encoding/json"
    "errors"
    "fmt"
    "time"
)

type SendMessageCmd struct {
    SenderID    int64
    ReceiverID  int64
    ClientMsgID string
    MsgType     int32
    Payload     map[string]any
}

type SendMessageResp struct {
    MsgID      string
    Seq        int64
    ServerTime int64
}

type ConversationRepo interface {
    FindOrCreateSingle(ctx context.Context, tx *sql.Tx, senderID, receiverID int64) (Conversation, error)
    NextSeq(ctx context.Context, tx *sql.Tx, conversationID int64) (int64, error)
    UpdateLastMessage(ctx context.Context, tx *sql.Tx, conversationID int64, msgID string, seq int64, ts int64) error
}

type MessageRepo interface {
    FindByClientMsgID(ctx context.Context, senderID int64, clientMsgID string) (*Message, error)
    Insert(ctx context.Context, tx *sql.Tx, msg Message) error
}

type DedupStore interface {
    Get(ctx context.Context, key string) (string, error)
    SetNX(ctx context.Context, key, value string, ttl time.Duration) (bool, error)
}

type EventPublisher interface {
    PublishMessageCreated(ctx context.Context, evt MessageCreatedEvent) error
}

type TxManager interface {
    WithTx(ctx context.Context, fn func(tx *sql.Tx) error) error
}

type Service struct {
    txm       TxManager
    convRepo  ConversationRepo
    msgRepo   MessageRepo
    dedup     DedupStore
    publisher EventPublisher
    idgen     func() string
    now       func() time.Time
}

func (s *Service) SendSingleMessage(ctx context.Context, cmd SendMessageCmd) (*SendMessageResp, error) {
    if cmd.ClientMsgID == "" {
        return nil, errors.New("clientMsgId is required")
    }
    if cmd.SenderID == 0 || cmd.ReceiverID == 0 {
        return nil, errors.New("sender or receiver is invalid")
    }

    dedupKey := fmt.Sprintf("dedup:client_msg:%d:%s", cmd.SenderID, cmd.ClientMsgID)
    if msgID, err := s.dedup.Get(ctx, dedupKey); err == nil && msgID != "" {
        oldMsg, findErr := s.msgRepo.FindByClientMsgID(ctx, cmd.SenderID, cmd.ClientMsgID)
        if findErr == nil && oldMsg != nil {
            return &SendMessageResp{
                MsgID:      oldMsg.MsgID,
                Seq:        oldMsg.Seq,
                ServerTime: oldMsg.ServerTime,
            }, nil
        }
    }

    serverTime := s.now().UnixMilli()
    msgID := s.idgen()

    reserved, err := s.dedup.SetNX(ctx, dedupKey, msgID, 24*time.Hour)
    if err != nil {
        return nil, err
    }
    if !reserved {
        oldMsg, findErr := s.msgRepo.FindByClientMsgID(ctx, cmd.SenderID, cmd.ClientMsgID)
        if findErr == nil && oldMsg != nil {
            return &SendMessageResp{
                MsgID:      oldMsg.MsgID,
                Seq:        oldMsg.Seq,
                ServerTime: oldMsg.ServerTime,
            }, nil
        }
        return nil, errors.New("duplicated request")
    }

    var resp SendMessageResp
    payloadBytes, err := json.Marshal(cmd.Payload)
    if err != nil {
        return nil, err
    }

    err = s.txm.WithTx(ctx, func(tx *sql.Tx) error {
        conv, err := s.convRepo.FindOrCreateSingle(ctx, tx, cmd.SenderID, cmd.ReceiverID)
        if err != nil {
            return err
        }

        seq, err := s.convRepo.NextSeq(ctx, tx, conv.ID)
        if err != nil {
            return err
        }

        msg := Message{
            MsgID:          msgID,
            ConversationID: conv.ID,
            Seq:            seq,
            SenderID:       cmd.SenderID,
            ReceiverID:     sql.NullInt64{Int64: cmd.ReceiverID, Valid: true},
            MsgType:        cmd.MsgType,
            Payload:        payloadBytes,
            ClientMsgID:    sql.NullString{String: cmd.ClientMsgID, Valid: true},
            SendTime:       serverTime,
            ServerTime:     serverTime,
            Status:         1,
            CreatedAt:      serverTime,
            UpdatedAt:      serverTime,
        }

        if err := s.msgRepo.Insert(ctx, tx, msg); err != nil {
            return err
        }

        if err := s.convRepo.UpdateLastMessage(ctx, tx, conv.ID, msgID, seq, serverTime); err != nil {
            return err
        }

        resp = SendMessageResp{
            MsgID:      msgID,
            Seq:        seq,
            ServerTime: serverTime,
        }
        return nil
    })
    if err != nil {
        return nil, err
    }

    evt := MessageCreatedEvent{
        MsgID:      resp.MsgID,
        SenderID:   cmd.SenderID,
        ReceiverID: cmd.ReceiverID,
        Seq:        resp.Seq,
        ServerTime: resp.ServerTime,
    }
    if err := s.publisher.PublishMessageCreated(ctx, evt); err != nil {
        return nil, err
    }

    return &resp, nil
}

这段代码体现了几个生产级关键点:

  • 使用 clientMsgId 做客户端重试幂等
  • Redis 只做第一层去重,数据库仍是最终兜底
  • seq 在事务内分配,避免会话内乱序
  • 发送成功回包发生在消息持久化之后
  • 在线投递由异步事件承担,而非同步直推

7.3 群聊消息处理流程

群聊不能直接复用单聊逻辑,否则一旦群规模上来就会出问题。

推荐流程如下:

1. 校验发送者是否为群成员
2. 获取 groupConversationId
3. 申请群会话 seq
4. 写入群消息主表
5. 投递 group_message_created 事件
6. Fanout Worker 根据群规模选择策略:
   - 小群:批量写入成员 inbox
   - 大群:仅更新群游标,在线用户实时推送
7. 在线用户收到推送,离线用户后续增量同步

为什么群成员列表不能每次都查数据库

因为群聊是典型热点路径。

生产上通常这样做:

  • MySQL 存权威成员关系
  • Redis 缓存热点群成员集
  • 本地缓存保存超热点群的短期快照
  • 群成员变更时,通过事件广播失效缓存

八、连接层工程化设计

8.1 Gateway 不只是“收发 WebSocket”

一个成熟的 Connection Gateway 至少要承担:

  • Token 鉴权
  • 协议版本协商
  • 心跳维持
  • 连接级限流
  • 包体大小校验
  • 非法请求拦截
  • 慢连接隔离
  • 写队列背压
  • 路由注册与续约

如果只是简单调用 ReadMessage / WriteMessage,在真实流量下很快就会暴露问题。

8.2 生产级连接对象设计

package gateway

import (
    "context"
    "sync"
    "sync/atomic"
    "time"

    "github.com/gorilla/websocket"
)

type Connection struct {
    userID        int64
    deviceID      string
    connID        string
    ws            *websocket.Conn
    writeCh       chan []byte
    closed        atomic.Bool
    lastHeartbeat atomic.Int64
    closeOnce     sync.Once
}

func NewConnection(userID int64, deviceID, connID string, ws *websocket.Conn) *Connection {
    c := &Connection{
        userID:   userID,
        deviceID: deviceID,
        connID:   connID,
        ws:       ws,
        writeCh:  make(chan []byte, 1024),
    }
    c.lastHeartbeat.Store(time.Now().Unix())
    return c
}

func (c *Connection) Start(ctx context.Context, onClose func()) {
    go c.readLoop(ctx, onClose)
    go c.writeLoop(onClose)
}

func (c *Connection) TouchHeartbeat() {
    c.lastHeartbeat.Store(time.Now().Unix())
}

func (c *Connection) IsHeartbeatExpired(timeout time.Duration) bool {
    last := time.Unix(c.lastHeartbeat.Load(), 0)
    return time.Since(last) > timeout
}

func (c *Connection) AsyncWrite(data []byte) error {
    if c.closed.Load() {
        return ErrConnClosed
    }
    select {
    case c.writeCh <- data:
        return nil
    default:
        c.Close()
        return ErrBackpressure
    }
}

func (c *Connection) Close() {
    c.closeOnce.Do(func() {
        c.closed.Store(true)
        close(c.writeCh)
        _ = c.ws.Close()
    })
}

func (c *Connection) readLoop(ctx context.Context, onClose func()) {
    defer func() {
        c.Close()
        onClose()
    }()

    _ = c.ws.SetReadDeadline(time.Now().Add(70 * time.Second))
    c.ws.SetPongHandler(func(string) error {
        c.TouchHeartbeat()
        return c.ws.SetReadDeadline(time.Now().Add(70 * time.Second))
    })

    for {
        select {
        case <-ctx.Done():
            return
        default:
            _, data, err := c.ws.ReadMessage()
            if err != nil {
                return
            }
            c.TouchHeartbeat()
            _ = handleInboundPacket(ctx, c, data)
        }
    }
}

func (c *Connection) writeLoop(onClose func()) {
    defer func() {
        c.Close()
        onClose()
    }()

    ticker := time.NewTicker(20 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case data, ok := <-c.writeCh:
            if !ok {
                return
            }
            _ = c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
            if err := c.ws.WriteMessage(websocket.BinaryMessage, data); err != nil {
                return
            }
        case <-ticker.C:
            _ = c.ws.SetWriteDeadline(time.Now().Add(5 * time.Second))
            if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil {
                return
            }
        }
    }
}

这段代码解决了什么问题

  • 使用独立 writeLoop 防止业务线程被慢连接阻塞
  • 使用 writeCh 做连接级背压
  • 利用 Ping/Pong + ReadDeadline 发现假死连接
  • CloseOnce 避免重复关闭造成 panic
  • 读写解耦,利于后续插入压缩、批量合并、统计打点

8.3 Gateway 路由注册与续约

type RouteInfo struct {
    GatewayID    string `json:"gatewayId"`
    ConnID       string `json:"connId"`
    UserID       int64  `json:"userId"`
    DeviceID     string `json:"deviceId"`
    ConnectTime  int64  `json:"connectTime"`
    LastActiveAt int64  `json:"lastActiveAt"`
}

func (s *RouteService) Register(ctx context.Context, route RouteInfo) error {
    key := fmt.Sprintf("route:user:%d:%s", route.UserID, route.DeviceID)
    buf, _ := json.Marshal(route)
    return s.redis.Set(ctx, key, buf, 30*time.Second).Err()
}

func (s *RouteService) Renew(ctx context.Context, userID int64, deviceID string) error {
    key := fmt.Sprintf("route:user:%d:%s", userID, deviceID)
    return s.redis.Expire(ctx, key, 30*time.Second).Err()
}

工程建议:

  • 路由刷新不要每个心跳都写完整 JSON,可只 EXPIRE
  • 超大规模下可做路由分片,避免单实例 Redis 热点
  • 路由 key 必须可从 userId + deviceId 直接推导,便于快速查找

九、高并发与可扩展设计

9.1 水平扩展的关键原则

要让 IM 系统真正可扩展,必须满足三点:

  1. 连接层尽量无状态
  2. 消息处理按会话分区
  3. 热点操作可异步、可削峰、可降级

这三点基本决定了系统是否能从“十万在线”长到“百万在线”。

9.2 消息队列分区策略

Kafka 的分区键建议这样设计:

  • 单聊:conversationId
  • 群聊:groupIdgroupConversationId
  • 回执:conversationId

这样可以获得:

  • 会话内顺序
  • 分区间并行
  • 消费者水平扩容

分区数如何估算

经验公式:

分区数 = 峰值吞吐 / 单分区稳定吞吐 * 冗余系数

如果单分区稳定处理 3000 msg/s,峰值 30 万 msg/s,则:

300000 / 3000 = 100
100 * 1.5 ≈ 150 分区

实际生产中通常会向上取整,例如 192 或 256 分区,便于后续扩容。

9.3 限流、背压与降级

分布式 IM 最怕的不是“平均负载高”,而是“突发洪峰”。

必须分层做保护:

连接层保护

  • 单连接 QPS 限流
  • 包体大小限制
  • 鉴权失败快速断开
  • 写队列满时断开慢连接

业务层保护

  • 单用户发送频控
  • 单群发言频控
  • 大群消息异步化
  • 敏感词/风控异步旁路

存储层保护

  • 批量写
  • 异步索引
  • 冷热分层
  • 热点会话缓存

用户体验降级

系统压力过大时,优先保核心能力:

  1. 单聊消息发送与接收
  2. 小群消息
  3. 历史同步
  4. 在线状态
  5. 输入中、回执、已读、推送补偿等弱核心能力

这类降级策略需要在设计时就预埋,不能等故障发生后临时补。

9.4 大群专项治理

大群是 IM 的系统性风险点,建议单独治理:

  • 大群分级:普通群、千人群、万人群
  • 大群消息走专用 Topic
  • 大群未读计数异步聚合
  • 大群只对在线用户实时推送,离线用户读扩散拉取
  • 大群成员缓存独立分片
  • 大群支持发言频控、管理员模式、慢启动广播

这类“业务规则”实际上是架构稳定性的组成部分。

十、可靠性设计:不丢、不乱、可恢复

10.1 幂等设计

幂等至少要做三层:

  1. 客户端层
    • 每条消息带 clientMsgId
  2. 缓存层
    • Redis SETNX 做短期快速去重
  3. 数据库层
    • 唯一键做最终强约束

这三层缺一不可。

10.2 ACK 设计

建议把 ACK 拆成三类:

  • server_ack
    • 服务端已接收并持久化
  • deliver_ack
    • 对端客户端已收到
  • read_ack
    • 对端已读

这样客户端才能准确展示:

  • 发送中
  • 已发送
  • 已送达
  • 已读

为什么不要把“写入成功”和“对端收到”混为一谈

因为这两个语义完全不同:

  • 前者依赖服务端落库成功
  • 后者依赖目标客户端在线且网络可达

混在一起会导致前端状态机混乱,也会让排障非常困难。

10.3 断线重连与离线同步

客户端断线后,不能简单“重新登录然后继续收消息”,必须基于游标补偿。

推荐流程:

1. 客户端重连并上报 deviceId、token、lastAckedSeqMap
2. 服务端鉴权并重建 route
3. Sync Service 根据各会话 read/sync cursor 拉取增量消息
4. 按 seq 顺序回放
5. 恢复在线推送

为什么恢复时要按会话游标,而不是全局消息 ID

因为实际系统中:

  • 不同会话处理并发度不同
  • 全局 ID 不适合表达会话内顺序
  • 已读与未读统计天然基于会话

因此恢复以“会话游标”为中心是更稳妥的设计。

10.4 Outbox 模式避免“库写成功但事件丢失”

这是很多系统容易忽略的可靠性漏洞。

如果发送流程是:

  1. 事务提交数据库
  2. 再调用 Kafka 发送消息

那么在步骤 1 和步骤 2 之间服务崩溃,就会出现:

  • 消息已落库
  • 但没有任何投递事件

用户看到“发送成功”,对端却永远收不到在线消息。

推荐使用 Outbox Pattern

CREATE TABLE message_outbox (
    id              BIGINT PRIMARY KEY,
    event_type      VARCHAR(64) NOT NULL,
    aggregate_id    BIGINT NOT NULL,
    payload         JSON NOT NULL,
    status          TINYINT NOT NULL DEFAULT 0,
    retry_count     INT NOT NULL DEFAULT 0,
    next_retry_at   BIGINT NOT NULL DEFAULT 0,
    created_at      BIGINT NOT NULL,
    updated_at      BIGINT NOT NULL
);

事务内同时写:

  • message
  • conversation
  • message_outbox

然后由独立 Outbox Worker 轮询并投递 Kafka,成功后再更新 outbox 状态。

这会让系统从“尽量可靠”提升到“可证明恢复”。

十一、可观测性与运维治理

11.1 监控指标设计

IM 系统不能只看 CPU、内存,核心是业务指标和链路指标。

接入层指标

  • 当前在线连接数
  • 新增连接数/断开连接数
  • 心跳超时数
  • 每连接平均入站/出站速率
  • 写队列积压数
  • 慢连接断开数

消息链路指标

  • 消息发送 QPS
  • 单聊/群聊占比
  • Kafka 生产/消费延迟
  • 在线投递成功率
  • 离线同步耗时
  • 消息端到端延迟 P50/P95/P99

存储与状态指标

  • MySQL TPS / 慢 SQL
  • Redis QPS / 命中率 / 热 key
  • Outbox backlog
  • DLQ 积压量

11.2 日志与 Trace

每条消息建议统一打通以下追踪字段:

  • traceId
  • msgId
  • clientMsgId
  • conversationId
  • seq
  • senderId
  • receiverId / groupId
  • gatewayId

这样在排查“消息丢了”“消息重复”“对端没收到”“顺序乱了”等问题时,才能从网关、业务、队列、存储一路串起来。

11.3 常见故障场景与处理

场景一:Gateway 节点重启,用户瞬时大规模掉线

解决:

  • 客户端指数退避重连
  • Gateway 发布前摘流
  • 路由 TTL 自动失效
  • Sync Service 重连后自动补拉增量消息

场景二:Redis 抖动导致路由查找失败

解决:

  • Dispatcher 短暂重试
  • 路由未命中则标记为离线投递
  • 依赖离线同步补偿,而不是同步阻塞重试

场景三:Kafka 消费堆积

解决:

  • 优先看热点群或某个异常分区
  • 动态扩消费者
  • 群聊与单聊拆 Topic
  • 必要时关闭非核心事件消费

场景四:大群突发广播引发雪崩

解决:

  • 大群专用 Topic
  • 分批扇出
  • 在线/离线分流
  • push 补偿延后
  • 未读异步聚合

十二、Kubernetes 部署与生产配置

12.1 Connection Gateway 部署示例

apiVersion: apps/v1
kind: Deployment
metadata:
  name: im-connection-gateway
  labels:
    app: im-connection-gateway
spec:
  replicas: 20
  selector:
    matchLabels:
      app: im-connection-gateway
  template:
    metadata:
      labels:
        app: im-connection-gateway
    spec:
      terminationGracePeriodSeconds: 60
      containers:
      - name: gateway
        image: registry.example.com/im/connection-gateway:1.0.0
        ports:
        - name: ws
          containerPort: 8000
        - name: grpc
          containerPort: 9000
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: GOMAXPROCS
          value: "4"
        - name: REDIS_ADDR
          value: "redis-cluster:6379"
        - name: KAFKA_BROKERS
          value: "kafka-0:9092,kafka-1:9092,kafka-2:9092"
        resources:
          requests:
            cpu: "2"
            memory: "2Gi"
          limits:
            cpu: "4"
            memory: "6Gi"
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 5
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 20
          periodSeconds: 10
        lifecycle:
          preStop:
            exec:
              command:
              - /bin/sh
              - -c
              - |
                wget -qO- http://127.0.0.1:8000/drain && sleep 30

为什么需要 preStop + drain

因为连接服务不是普通无状态 HTTP 服务。

如果 Pod 被直接杀掉,会导致:

  • 大量连接同时断开
  • 短时间重连洪峰
  • 路由刷新风暴

推荐流程是:

  1. 先摘除新连接流量
  2. 让旧连接在 20 到 30 秒内自然迁移
  3. 再关闭进程

12.2 HPA 扩缩容建议

连接层不要只按 CPU 扩容,建议综合:

  • CPU 使用率
  • 当前连接数
  • 出站消息速率

例如:

  • 单 Pod 连接数阈值 2 万
  • CPU > 65%
  • 出站消息 > 8000 msg/s

三者中任意满足即可扩容。

12.3 Nginx/Envoy 接入建议

典型要求:

  • 支持四层或七层 WebSocket 代理
  • 打开长连接超时
  • 开启 TLS
  • 支持源地址透传
  • 可做接入层限流

Nginx 示例:

worker_processes auto;

events {
    worker_connections 65535;
    use epoll;
}

http {
    upstream im_gateway {
        least_conn;
        server 10.0.0.11:8000 max_fails=3 fail_timeout=10s;
        server 10.0.0.12:8000 max_fails=3 fail_timeout=10s;
        server 10.0.0.13:8000 max_fails=3 fail_timeout=10s;
    }

    server {
        listen 443 ssl http2;
        server_name im.example.com;

        ssl_certificate     /etc/nginx/cert/server.crt;
        ssl_certificate_key /etc/nginx/cert/server.key;

        location /ws {
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_read_timeout 3600s;
            proxy_send_timeout 3600s;
            proxy_pass http://im_gateway;
        }
    }
}

十三、真实业务场景演进案例

下面给出一个更贴近实际的架构演进路径。

13.1 第一阶段:单体版 IM

架构:

  • 一个 WebSocket 服务
  • 一个 MySQL
  • 一个 Redis

容量:

  • 1 万在线以内
  • 适合内部系统或 MVP 验证

问题:

  • 连接与业务耦合
  • 扩容困难
  • 故障影响面大

13.2 第二阶段:接入层与业务层拆分

架构:

  • Connection Gateway 独立部署
  • Message Service 独立部署
  • Redis 做全局路由

收益:

  • 连接层独立扩容
  • 消息业务可单独演进

容量:

  • 5 到 20 万在线

13.3 第三阶段:引入消息队列与异步化

架构:

  • 增加 Kafka
  • 在线投递异步化
  • 引入 Outbox
  • 引入群聊 Fanout Worker

收益:

  • 削峰填谷
  • 顺序性更清晰
  • 失败补偿更容易

容量:

  • 20 到 80 万在线

13.4 第四阶段:大群治理与多机房

架构:

  • 单聊/群聊拆 Topic
  • 大群单独处理链路
  • 多机房部署
  • 就近接入 + 异地同步

收益:

  • 支撑百万在线
  • 单机房故障可降级运行

13.5 一个典型的线上案例

某企业协同平台的实际现象:

  • 日活 400 万
  • 峰值在线 110 万
  • 高峰期晚间 8 点出现群通知洪峰
  • 前期群消息直接写扩散,导致大群消息写库放大严重

优化过程:

  1. 将 1000 人以上群聊改为读扩散
  2. 大群消息切换到专用 Topic
  3. 在线投递按 Gateway 聚合批量推送
  4. 未读数改为异步聚合,不在主链路实时逐人更新
  5. push 通知延后 3 到 5 秒,避免与在线扇出抢资源

最终结果:

  • 峰值 CPU 从 85% 降到 52%
  • MySQL 写 TPS 降低 40%+
  • 群聊 P99 延迟从 1.6s 降到 620ms
  • 滚动发布期间重连洪峰下降约 60%

这说明 IM 系统优化的关键,不只是“换更强的中间件”,更重要的是 让消息语义与资源模型匹配

十四、容量规划与压测思路

14.1 单 Pod 连接容量估算

影响单 Pod 连接数的核心因素包括:

  • 每连接内存占用
  • goroutine 数量
  • 写缓冲大小
  • 心跳频率
  • TLS 开销

粗略估算公式:

单连接内存 = websocket对象 + 读写缓冲 + 路由状态 + goroutine栈

如果压测后平均每连接占用 25KB,6Gi 可用内存下,理论上:

6 * 1024 * 1024 KB / 25 KB ≈ 245760 连接

但生产不能按理论极限跑,通常只取 20% 到 30% 作为安全水位。

因此更合理的单 Pod 容量可能是:

  • 2 万到 5 万连接

具体值必须靠压测确认。

14.2 压测要覆盖哪些维度

不要只压“建连成功数”,还要同时压:

  • 建连速度
  • 心跳稳定性
  • 单聊吞吐
  • 群聊扇出
  • 断线重连
  • 发布摘流
  • Redis/Kafka 故障注入

推荐压测场景

  1. 10 万连接稳定 30 分钟
  2. 每秒 1 万条单聊消息持续 15 分钟
  3. 每分钟 100 条万人群消息
  4. 5% 客户端随机断线重连
  5. 滚动重启 20% Gateway 实例

没有这些压测,所谓“百万在线架构”大概率只是理论架构。

十五、常见设计误区

误区一:Redis 能扛住就说明架构没问题

错。Redis 只是状态缓存和加速层,不应该承担消息事实来源。

误区二:消息不重复等于消息可靠

错。IM 更重要的是不丢、可恢复,而不是绝不重复。

误区三:所有群都使用同一套扇出逻辑

错。群规模不同,最优策略完全不同。

误区四:在线状态必须精确到秒

错。Presence 适合最终一致,别把系统预算浪费在次要目标上。

误区五:上线前只做功能测试,不做故障演练

错。IM 的大部分事故都发生在扩缩容、网络抖动、队列堆积、热点群冲击等非功能场景。

十六、文章总结

一套真正能落地的生产级 IM 系统,核心不是“用了哪些中间件”,而是是否把以下问题设计清楚了:

  1. 消息语义
  2. 顺序边界
  3. 存储真相源
  4. 在线投递与离线恢复
  5. 大群扇出策略
  6. 连接背压与慢连接隔离
  7. 幂等、ACK、Outbox 等可靠性机制
  8. 限流、降级、可观测性与发布治理

如果用一句话概括 IM 系统架构的本质,那就是:

以消息持久化为真相源,以会话顺序为核心约束,以连接层实时投递为加速通道,以异步化与工程治理支撑高并发和持续演进。

当系统从 1 万在线走向 100 万在线时,真正决定成败的不是某一个组件的极限性能,而是整个链路是否具备:

  • 明确的语义边界
  • 稳定的可扩展模型
  • 可恢复的故障处理机制
  • 可持续演进的工程结构

这才是分布式 IM 系统从“能用”走向“生产级”的关键。

本文由云栈社区编辑整理,更多分布式架构实践内容可关注云栈社区。




上一篇:crontab定时任务避坑指南:SRE总结的8个血泪教训
下一篇:Spring Boot HikariCP 连接池生产调优指南:从原理到高并发治理
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-5-17 03:13 , Processed in 0.832954 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表