找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3206

积分

0

好友

424

主题
发表于 20 小时前 | 查看: 12| 回复: 0

当 Demo 能跑通时,我们解决的是“模型可用”;当系统要上线时,我们解决的是“业务可交付”;当并发、成本、稳定性和多租户一起压上来时,真正要回答的问题其实只有一个:怎样把 FunASR 从一个推理程序,演进成一个生产级实时语音平台

本文不再停留于“如何启动一个容器”或“怎样调用一次 WebSocket 接口”,而是从架构师和一线工程人员的视角,系统回答以下四个问题:

  • FunASR 流式识别到底是怎么工作的,为什么 chunk、cache、VAD、热词会直接影响延迟和准确率?
  • 为什么单机部署一旦进入生产就容易出现排队、显存抖动、尾延迟飙升和会话乱序?
  • 怎样从单机推理,演进到支持高并发、弹性扩缩容、会话有序、可观测、可治理的分布式系统
  • 怎样补齐生产代码、核心配置、监控指标、压测方法与真实业务案例,让文章不只“能看”,而且“能落地”?

如果你正在建设客服质检、会议纪要、语音机器人、车载语音、直播字幕、实时翻译等系统,这篇文章会更贴近真实场景。


一、为什么 FunASR Demo 在生产环境里容易失效

很多团队第一次接触 FunASR,路径都很相似:

  1. 下载模型。
  2. 跑通官方 Demo。
  3. 本地传一段音频,结果还不错。
  4. 上线后发现延迟、并发、稳定性、成本全都不对。

问题并不在于 FunASR 本身,而在于我们常常把“模型推理成功”误当成“系统工程成功”。

生产环境里的流式语音识别,至少要同时满足以下目标:

  • 低延迟:首字延迟、句尾延迟、P95/P99 尾延迟可控。
  • 高并发:成百上千甚至更多会话同时在线,且不能相互挤占导致雪崩。
  • 高可用:节点宕机、网络抖动、客户端断线重连、消息重复投递都能兜住。
  • 高精度:VAD、在线模型、离线二次修正、热词、标点、ITN 要协同工作。
  • 低成本:GPU 利用率、实例规格、批处理策略、冷热分层要能持续优化。
  • 可治理:多租户隔离、限流、配额、审计、灰度、监控、告警要完善。

真正把系统打崩的,通常不是“模型不准”,而是下面这些工程问题:

  • 会话状态无法粘住:同一个 session 被负载均衡到了不同节点,导致 cache 失效,识别上下文断裂。
  • 显存被长尾连接吃满:连接不断开、缓存不回收、batch 不受控,GPU 很快进入抖动状态。
  • VAD 和流式状态机不稳定:讲话停顿稍长就误切句,造成在线结果频繁回滚。
  • 同步串行链路过长:网关、鉴权、热词、VAD、ASR、标点全部串行,尾延迟迅速恶化。
  • 高峰流量没有缓冲层:一波洪峰直接打到推理进程,队列积压和超时同时发生。
  • 压测指标失真:只看平均 RT,不看首字延迟、实时因子 RTF、GPU SM 利用率和队列等待时间。

所以,本文的目标不是“教你启动 FunASR”,而是“教你构建一套生产级流式 ASR 体系”。


二、FunASR 流式识别的核心原理:不是调用接口这么简单

要做好架构,先要理解模型和流式机制本身。否则所有优化都只是“调参数碰运气”。

2.1 FunASR 的能力边界

从工程视角看,FunASR 不是单个模型,而是一个语音理解处理链:

  • VAD:识别哪里是有效语音,哪里是静音。
  • Online ASR:对持续输入的音频块进行流式识别,尽快给出中间结果。
  • Offline ASR:在句子结束或完整片段到齐后,做高精度二次修正。
  • Punctuation:给最终结果补标点。
  • ITN:将文本从口语形态转换为标准书写形态,如“二零二六年四月十六日”转为“2026年4月16日”。
  • Hotword:通过热词增强改善业务关键词、人名、地名、产品名的召回。

因此,生产系统优化的不只是 ASR 模型,而是整条链路的协同效率。

2.2 Paraformer 为什么适合工业流式场景

传统自回归 ASR 的核心问题是:每个 token 的生成依赖上一个 token,天然串行,延迟高,吞吐受限。

Paraformer 属于非自回归端到端模型,它通过 Predictor 先估计输出长度,再并行生成 token,从而显著降低解码串行开销。它之所以在工业场景有优势,关键在于三点:

  • 推理更容易并行化,吞吐更高。
  • 延迟与输出长度耦合更弱,更适合实时字幕和语音机器人。
  • 更适合与 chunk cache 配合,构建流式增量推理链路。

可以把传统自回归理解成“边写边想”,把 Paraformer 理解成“先想结构,再快速落笔”。前者更像逐字生成,后者更像并行补全。

2.3 流式识别为什么必须使用 chunk

真实音频不是完整文件一次性提交,而是按时间不断到达。系统不可能等用户说完整句话再识别,否则首字延迟会极差。

因此,流式识别一定要把音频切成 chunk:

  • 每个 chunk 是一个很小的时间片,如 60ms、100ms、200ms。
  • 模型对每个 chunk 做增量计算,而不是每次重算全部历史。
  • 系统在 chunk 之间维护上下文缓存,保证跨块语义连续。

chunk 的大小决定了两个核心指标:

  • chunk 越小:首字更快,但 CPU/GPU 调度更频繁,系统开销更高,精度也可能下降。
  • chunk 越大:吞吐更稳、精度更高,但实时性变差,首字延迟会上升。

在工程中,chunk 不是越小越好,而是要结合业务 SLO 找平衡:

  • 实时字幕:更关注首字延迟,chunk 往往偏小。
  • 客服质检:更关注稳定性和成本,chunk 可以略大。
  • 车载场景:既要快,又要抗抖动,通常需要自适应策略。

2.4 cache 是流式推理性能的生命线

如果没有 cache,每来一个 chunk 都从头计算历史音频,复杂度会迅速失控。

FunASR 流式识别的关键机制之一,就是在 session 级别维护 cache:

  • encoder cache:缓存历史声学特征编码状态。
  • decoder/predictor cache:缓存历史上下文或对齐信息。
  • vad state:缓存静音、讲话段边界状态。
  • partial text state:缓存当前句的中间识别文本,供客户端增量展示。

这意味着一个非常重要的架构约束:

同一个 session 的 chunk,必须按顺序进入同一个状态机实例。

如果你让同一会话的 chunk 在多个 Pod 之间随机漂移,缓存就无法复用,轻则延迟升高,重则结果错乱。

2.5 VAD 为什么常常是延迟和体验的真正决定因素

很多团队一上来只盯着 ASR 推理耗时,却忽略了 VAD 对用户体验的影响更大。

一个典型链路的延迟可拆分为:

环节 典型耗时 说明
音频采集与上传 20ms - 150ms 与客户端实现、网络质量相关
网关接入与鉴权 5ms - 30ms 取决于是否有额外业务逻辑
队列等待 0ms - 数百 ms 高峰期最容易失控
VAD 判定 50ms - 400ms 句尾等待尤其关键
在线 ASR 推理 20ms - 120ms 与模型、batch、GPU 相关
标点/ITN/后处理 5ms - 50ms 往往被低估
结果回传 10ms - 80ms 与网络和订阅方式相关

很多时候,用户觉得“字幕慢”,并不是 GPU 慢,而是:

  • VAD 在等更长的静音窗口确认句尾。
  • 网关把请求排进了内部队列。
  • 在线结果被设计成“频繁回滚”,导致客户端观感很差。

因此,生产级优化必须区分三类延迟:

  • 首字延迟 First Token Latency
  • 中间增量更新间隔 Incremental Update Interval
  • 最终结果确认延迟 Finalization Latency

2.6 2-Pass 架构为什么是工业实践中的常态

纯在线识别的优势是快,但通常不如完整上下文下的离线识别稳定。纯离线识别的优势是准,但体验太差。

所以工业系统通常采用 2-Pass:

  • 第一阶段:在线流式模型输出低延迟中间结果。
  • 第二阶段:句子结束后使用完整上下文做离线修正,补齐标点、ITN、热词校正。

这也是为什么生产系统里往往会同时存在两条路径:

  • 热路径:面向用户实时回显,追求快。
  • 冷路径:面向最终文本入库、质检、检索和摘要,追求准。

如果你把这两条路径混在一起,就很容易出现“为了更准把实时性拖死”或者“为了更快导致入库文本不可用”的问题。


三、从单机到平台:FunASR 生产架构的五个演进阶段

生产架构的演进,本质上是不断解决新的系统瓶颈。下面给出一个更贴近现实的五阶段模型。

3.1 阶段 0:单进程原型

最简形式通常是:

Client -> WebSocket Server -> FunASR Model -> Result

这个阶段适合做:

  • 模型效果验证
  • 客户端协议联调
  • 热词机制和结果格式设计

但它的问题也很明显:

  • 无法隔离接入层和推理层
  • 没有水平扩展能力
  • 连接数稍高就会把进程拖垮
  • 没有会话治理、熔断、限流、监控

3.2 阶段 1:单机多进程 + Nginx 接入

Client -> Nginx -> ASR Worker 1..N -> Local Model Cache

这一步解决的是基本接入问题:

  • 用 Nginx 支持 WebSocket 升级
  • 用 supervisor/systemd 管理多进程
  • 用本地共享模型目录减少重复加载

适用场景:

  • 开发测试
  • 部门内工具
  • 单租户、低并发场景

瓶颈:

  • 依然是单机资源池
  • GPU 无法灵活切分和调度
  • 重启、发布、扩容都依赖单点主机

3.3 阶段 2:接入层与推理层分离

Client -> Gateway -> Session Router -> ASR Pods

这一步是体系化建设的起点。你不再让客户端直连模型服务,而是引入网关和会话路由层。

职责拆分如下:

  • Gateway:鉴权、租户识别、协议转换、限流、心跳、断连处理。
  • Session Router:确保同一个 session 的 chunk 有序地进入同一条处理链。
  • ASR Pods:只负责推理,不处理复杂业务状态。

这一步的核心价值是:

  • 接入复杂度与推理复杂度解耦
  • 多租户和安全控制有了承载点
  • 后续引入 Kafka、Redis、K8s 都有了结构基础

3.4 阶段 3:事件驱动化,加入 Kafka 与 Redis

Client -> Gateway -> Kafka(audio-chunks)
                     -> Streaming Workers
                     -> Kafka(asr-results)
                     -> Push Service

这是从“同步调用系统”走向“流式平台”的关键一步。

为什么一定要引入消息队列?

  • 用于削峰填谷,隔离突发洪峰。
  • 用于显式背压,防止把推理进程直接打爆。
  • 用于解耦热路径与冷路径。
  • 用于审计与回放,便于问题排查和离线重算。

Redis 则主要承担三类状态:

  • session 状态:cache 索引、最后活跃时间、租户上下文。
  • 热词配置:租户级和会话级热词动态加载。
  • 防重与分布式锁:避免重复 close、重复 finalization。

这一阶段,系统第一次具备“像平台一样运行”的基础。

3.5 阶段 4:Kubernetes + HPA/KEDA 弹性调度

当日常并发达到几百到几千路,且不同租户的峰谷差异显著时,静态部署已经无法承载。

Kubernetes 的价值在于:

  • 把 GPU 资源池化
  • 通过 HPA 根据活跃流数、CPU、内存等扩缩容
  • 通过 KEDA 根据 Kafka lag 自动拉起离线修正 Worker
  • 配合 Ingress、Service Mesh、Prometheus 构建治理体系

这一步不仅是“自动扩容”,更重要的是把“算力调度”变成系统能力。

3.6 阶段 5:百万并发平台的本质,不是百万路 GPU 直连

很多文章写“百万并发”,喜欢直接给出一个宏大标题,但没有解释系统语义。严格来说,“百万并发”并不等于“百万路音频实时在 GPU 上同时解码”。

在实时语音平台里,百万并发往往意味着:

  • 百万级长连接在线
  • 数十万级会话处于活跃或准活跃状态
  • 某个时间窗口内,真正进入 ASR 推理层的有效流量远低于连接数
  • 通过多级缓冲、静音抑制、边缘预处理、热冷分层和租户配额,系统才有可能在成本可接受的情况下承载海量连接

换句话说,百万并发平台拼的不是“模型本身”,而是 接入架构、流控机制、状态治理、异步化和资源调度


四、生产级总体架构:控制面与数据面分离

要把系统做大,最重要的架构升级之一,就是把控制面和数据面分开。

4.1 数据面:承载高频音频流

数据面负责真正的高频实时请求:

  • WebSocket/WebRTC 音频上行
  • chunk 编排
  • session route
  • streaming inference
  • partial/final result push

数据面的要求是:

  • 低延迟
  • 少依赖
  • 最少业务逻辑
  • 避免同步调用外部系统

4.2 控制面:负责治理与配置

控制面负责低频但复杂的操作:

  • 租户配置
  • 热词管理
  • 配额和限流策略
  • 模型版本与灰度发布
  • 告警阈值和监控配置
  • 审计和追踪

控制面变化频率低,但逻辑复杂。把它混进数据面,只会让实时链路变脆弱。

4.3 推荐总体架构

                       +-------------------------+
                       |      Control Plane      |
                       | tenant, quota, hotword  |
                       | config, model release   |
                       +-----------+-------------+
                                   |
                                   v
+------------+     +---------------------------+      +----------------------+
|  Clients    | -> | Gateway / Session Router  | ---> | Kafka: audio-chunks  |
| ws/webrtc   |     | auth, route, rate-limit   |      +----------+-----------+
+------------+     +---------------------------+                  |
                                                                  v
                                                       +--------------------------+
                                                       | Streaming ASR Workers    |
                                                       | session cache, VAD, ASR  |
                                                       +------------+-------------+
                                                                    |
                                    +---------------------------+---------+--------------------+
                                    |                           |                             |
                                    v                           v                             v
                        +-------------------+     +-------------------------+      +-------------------+
                        | Redis Cluster     |     | Kafka: asr-results      |      | Trace / Metrics   |
                        | session/hotwords  |     | partial/final results   |      | logs/rtf/latency  |
                        +-------------------+     +------------+------------+      +-------------------+
                                                                |
                                                                v
                                                   +--------------------------+
                                                   | Push Service / Consumers |
                                                   | realtime push / storage  |
                                                   +-------------+------------+
                                                                 |
                                                                 v
                                                   +--------------------------+
                                                   | Offline 2-Pass Workers   |
                                                   | punc, ITN, correction    |
                                                   +--------------------------+

这个架构的关键点有三个:

  • 音频流和控制配置解耦。
  • 实时在线识别和离线修正解耦。
  • session 有序性和全局扩展性同时满足。

五、会话有序性是架构成败的分水岭

流式 ASR 和普通 HTTP 服务最大的区别,就是它是 有状态流处理

5.1 为什么必须保证 session stickiness

一个流式会话包含:

  • 当前音频偏移量
  • 上下文 cache
  • VAD 状态
  • 当前句 partial text
  • 已加载的租户热词
  • 最近一次结果序号

如果一个 session 的 chunk 被轮询到不同实例,会出现:

  • cache 丢失,推理耗时急剧上升
  • 句子切分失真
  • partial result 回退异常
  • final result 与 partial result 无法对应

5.2 三种常见实现方式

方案一:网关本地一致性哈希

优点

  • 路径短
  • 延迟最低

缺点

  • 节点变更时哈希抖动明显
  • 扩缩容时会话迁移复杂

适合中小规模集群。

方案二:Kafka 按 session_id 分区

优点

  • 天然保证同一 session 顺序
  • 容易做回放和异步消费

缺点

  • 增加一跳消息队列
  • 需要处理端到端回压和积压

适合生产级分布式系统。

方案三:独立 Session Router

优点

  • 逻辑最清晰
  • 便于做灰度、迁移和租户路由

缺点

  • 系统更复杂
  • Router 本身要高可用

适合多租户大规模平台。

工程实践里,最常见的稳定组合是:

Gateway 做接入和轻路由,Kafka 按 session_id 分区保证有序,Worker 持有本地 cache,Redis 只存索引和元数据,不存高频中间张量。

这个边界很重要。不要试图把模型 cache 序列化后频繁写 Redis,那样会把低延迟系统做成高延迟系统。


六、生产级代码实战:从接入到推理的完整骨架

下面给出一套更接近生产系统的代码结构。示例以 Python 为主,重点展示架构思路,而不是依赖某个特定 SDK 版本。

6.1 目录结构建议

funasr-platform/
├── app/
│   ├── gateway/
│   │   ├── ws_server.py
│   │   ├── auth.py
│   │   └── limiter.py
│   ├── router/
│   │   └── session_router.py
│   ├── worker/
│   │   ├── stream_worker.py
│   │   ├── model_pool.py
│   │   ├── vad_state.py
│   │   └── postprocess.py
│   ├── infra/
│   │   ├── kafka_bus.py
│   │   ├── redis_repo.py
│   │   ├── metrics.py
│   │   └── tracing.py
│   └── domain/
│       ├── session.py
│       ├── events.py
│       └── result.py
├── deploy/
│   ├── docker-compose.yml
│   ├── k8s/
│   └── nginx/
└── tests/
    ├── load/
    └── integration/

6.2 网关层:负责接入、鉴权、限流、协议校验

网关层不要直接持有模型。它应该只做四件事:

  • 识别租户和用户身份
  • 生成或校验 session_id
  • 做速率限制和连接配额
  • 把 chunk 封装成事件推送给后端
import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from typing import Optional

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from redis.asyncio import Redis

app = FastAPI()

@dataclass
class AudioChunkEvent:
    session_id: str
    tenant_id: str
    sequence: int
    audio_bytes: bytes
    is_final: bool
    send_ts: float
    hotwords: list[str]

class ConnectionLimiter:
    def __init__(self, redis: Redis, tenant_limit: int = 2000):
        self.redis = redis
        self.tenant_limit = tenant_limit

    async def acquire(self, tenant_id: str, session_id: str) -> bool:
        key = f"tenant:{tenant_id}:active_sessions"
        value = await self.redis.scard(key)
        if value >= self.tenant_limit:
            return False
        await self.redis.sadd(key, session_id)
        await self.redis.expire(key, 3600)
        return True

    async def release(self, tenant_id: str, session_id: str) -> None:
        await self.redis.srem(f"tenant:{tenant_id}:active_sessions", session_id)

class KafkaProducerAdapter:
    async def send_audio_chunk(self, event: AudioChunkEvent) -> None:
        # 实际环境中应使用 aiokafka,key=session_id 保证同分区顺序
        payload = {
            "session_id": event.session_id,
            "tenant_id": event.tenant_id,
            "sequence": event.sequence,
            "audio_bytes": event.audio_bytes.hex(),
            "is_final": event.is_final,
            "send_ts": event.send_ts,
            "hotwords": event.hotwords,
        }
        _ = json.dumps(payload)

redis_client = Redis.from_url("redis://localhost:6379/0", decode_responses=True)
limiter = ConnectionLimiter(redis_client)
producer = KafkaProducerAdapter()

async def authenticate(ws: WebSocket) -> tuple[str, str]:
    token = ws.query_params.get("token", "")
    tenant_id = ws.query_params.get("tenant_id", "default")
    user_id = "anonymous" if not token else f"user-{token[-6:]}"
    return tenant_id, user_id

@app.websocket("/ws/asr")
async def asr_websocket(ws: WebSocket):
    await ws.accept()
    tenant_id, user_id = await authenticate(ws)
    session_id = ws.query_params.get("session_id") or str(uuid.uuid4())
    hotwords = [w for w in ws.query_params.get("hotwords", "").split(",") if w]

    acquired = await limiter.acquire(tenant_id, session_id)
    if not acquired:
        await ws.send_json({"code": 429, "message": "tenant session quota exceeded"})
        await ws.close()
        return

    sequence = 0
    try:
        await ws.send_json(
            {
                "code": 0,
                "message": "connected",
                "session_id": session_id,
                "user_id": user_id,
            }
        )

        while True:
            message = await ws.receive()
            if "bytes" in message and message["bytes"] is not None:
                sequence += 1
                await producer.send_audio_chunk(
                    AudioChunkEvent(
                        session_id=session_id,
                        tenant_id=tenant_id,
                        sequence=sequence,
                        audio_bytes=message["bytes"],
                        is_final=False,
                        send_ts=time.time(),
                        hotwords=hotwords,
                    )
                )
            elif "text" in message and message["text"] is not None:
                payload = json.loads(message["text"])
                if payload.get("type") == "end":
                    sequence += 1
                    await producer.send_audio_chunk(
                        AudioChunkEvent(
                            session_id=session_id,
                            tenant_id=tenant_id,
                            sequence=sequence,
                            audio_bytes=b"",
                            is_final=True,
                            send_ts=time.time(),
                            hotwords=hotwords,
                        )
                    )
                    await ws.send_json({"code": 0, "message": "closing"})
                    break
    except WebSocketDisconnect:
        pass
    finally:
        await limiter.release(tenant_id, session_id)
        await ws.close()

这段代码体现了三个生产级要点:

  • 连接配额按租户控制,而不是所有请求共用一个粗暴限流。
  • 音频上行和模型推理解耦,通过事件总线处理。
  • session_id + sequence 是后续结果去重、重排、回放的基础。

6.3 领域模型:显式表示会话状态

很多系统稳定性差,不是因为模型,而是因为状态都散落在各种 dict 里,没人知道一个 session 当前到底处于什么阶段。

from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import time

class SessionStage(str, Enum):
    CREATED = "created"
    STREAMING = "streaming"
    FINALIZING = "finalizing"
    CLOSED = "closed"

@dataclass
class SessionState:
    session_id: str
    tenant_id: str
    stage: SessionStage = SessionStage.CREATED
    last_sequence: int = 0
    partial_text: str = ""
    final_text: str = ""
    vad_cache: dict[str, Any] = field(default_factory=dict)
    asr_cache: dict[str, Any] = field(default_factory=dict)
    hotwords: list[str] = field(default_factory=list)
    created_at: float = field(default_factory=time.time)
    updated_at: float = field(default_factory=time.time)

    def touch(self) -> None:
        self.updated_at = time.time()

    def validate_sequence(self, sequence: int) -> bool:
        return sequence == self.last_sequence + 1

    def mark_streaming(self, sequence: int) -> None:
        self.stage = SessionStage.STREAMING
        self.last_sequence = sequence
        self.touch()

    def mark_finalizing(self, sequence: int) -> None:
        self.stage = SessionStage.FINALIZING
        self.last_sequence = sequence
        self.touch()

    def mark_closed(self) -> None:
        self.stage = SessionStage.CLOSED
        self.touch()

生产系统里,显式状态机会带来三个收益:

  • 出问题时可以定位 session 卡在哪个阶段。
  • 重试、幂等、回收逻辑有统一边界。
  • 监控指标可以直接从状态机导出。

6.4 推理 Worker:把“单模型调用”升级为“可批处理的状态机执行器”

真正的难点在这里。流式 Worker 不只是接个 chunk 调一次模型,而是要兼顾:

  • 按 session 顺序执行
  • 维护 cache
  • 控制 batch 规模
  • 在低延迟和高吞吐之间动态折中

下面给出一个可落地的执行骨架。

import asyncio
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any

@dataclass
class DecodeOutput:
    session_id: str
    sequence: int
    text: str
    is_final: bool
    confidence: float
    latency_ms: int

class ModelPool:
    def __init__(self, model_manager: Any, max_batch_size: int = 8, batch_wait_ms: int = 15):
        self.model_manager = model_manager
        self.max_batch_size = max_batch_size
        self.batch_wait_ms = batch_wait_ms
        self.pending: list[tuple[dict, asyncio.Future]] = []
        self.lock = asyncio.Lock()
        self.loop_task = asyncio.create_task(self._batch_loop())

    async def infer(self, request: dict) -> dict:
        future = asyncio.get_running_loop().create_future()
        async with self.lock:
            self.pending.append((request, future))
        return await future

    async def _batch_loop(self) -> None:
        while True:
            await asyncio.sleep(self.batch_wait_ms / 1000.0)
            async with self.lock:
                if not self.pending:
                    continue
                batch = self.pending[: self.max_batch_size]
                self.pending = self.pending[self.max_batch_size :]

            requests = [item[0] for item in batch]
            futures = [item[1] for item in batch]
            try:
                results = await self.model_manager.batch_generate(requests)
                for future, result in zip(futures, results):
                    if not future.done():
                        future.set_result(result)
            except Exception as exc:
                for future in futures:
                    if not future.done():
                        future.set_exception(exc)

class SessionRepository:
    def __init__(self):
        self.sessions: dict[str, SessionState] = {}

    def get_or_create(self, session_id: str, tenant_id: str) -> SessionState:
        state = self.sessions.get(session_id)
        if state is None:
            state = SessionState(session_id=session_id, tenant_id=tenant_id)
            self.sessions[session_id] = state
        return state

    def delete(self, session_id: str) -> None:
        self.sessions.pop(session_id, None)

class StreamingWorker:
    def __init__(self, model_pool: ModelPool, repository: SessionRepository):
        self.model_pool = model_pool
        self.repository = repository
        self.session_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)

    async def handle_chunk(self, event: dict) -> DecodeOutput:
        session_id = event["session_id"]
        tenant_id = event["tenant_id"]
        sequence = event["sequence"]
        start = time.perf_counter()

        async with self.session_locks[session_id]:
            state = self.repository.get_or_create(session_id, tenant_id)
            if not state.validate_sequence(sequence):
                raise ValueError(
                    f"out-of-order chunk: session={session_id}, expected={state.last_sequence + 1}, got={sequence}"
                )

            request = {
                "audio_bytes": bytes.fromhex(event["audio_bytes"]),
                "cache": state.asr_cache,
                "vad_cache": state.vad_cache,
                "hotwords": event.get("hotwords", state.hotwords),
                "is_final": event["is_final"],
            }

            if event["is_final"]:
                state.mark_finalizing(sequence)
            else:
                state.mark_streaming(sequence)

            result = await self.model_pool.infer(request)

            state.partial_text = result.get("partial_text", state.partial_text)
            if result.get("final_text"):
                state.final_text = result["final_text"]
            if result.get("cache") is not None:
                state.asr_cache = result["cache"]
            if result.get("vad_cache") is not None:
                state.vad_cache = result["vad_cache"]

            latency_ms = int((time.perf_counter() - start) * 1000)
            is_final = bool(event["is_final"] or result.get("final_text"))
            output = DecodeOutput(
                session_id=session_id,
                sequence=sequence,
                text=state.final_text if is_final else state.partial_text,
                is_final=is_final,
                confidence=float(result.get("confidence", 0.0)),
                latency_ms=latency_ms,
            )

            if is_final:
                state.mark_closed()
                self.repository.delete(session_id)

            return output

这段代码体现了四个关键设计:

  • 每个 session 内串行,多个 session 间并发。
  • 通过微批处理提升 GPU 吞吐。
  • cache 与状态在 Worker 本地内存中维护,减少远程访问开销。
  • final 时及时回收状态,避免长尾连接挤爆内存。

6.5 模型管理器:预热、限流、降级缺一不可

在生产环境里,模型管理器不能只是“加载模型”。

它至少要具备:

  • 预热:避免第一批请求触发冷启动。
  • 并发闸门:限制同时进入 GPU 的推理数。
  • 降级:GPU 紧张时,允许部分租户切到 CPU 小模型或转离线。
  • 健康探测:连续错误时主动摘除实例。
import asyncio
import random
from contextlib import asynccontextmanager

class FunASRModelManager:
    def __init__(self, online_model: Any, max_inflight: int = 32):
        self.online_model = online_model
        self.semaphore = asyncio.Semaphore(max_inflight)
        self.ready = False

    async def warmup(self) -> None:
        dummy = {
            "audio_bytes": b"\x00" * 3200,
            "cache": {},
            "vad_cache": {},
            "hotwords": [],
            "is_final": False,
        }
        for _ in range(3):
            await self.batch_generate([dummy])
        self.ready = True

    @asynccontextmanager
    async def inflight_guard(self):
        await self.semaphore.acquire()
        try:
            yield
        finally:
            self.semaphore.release()

    async def batch_generate(self, requests: list[dict]) -> list[dict]:
        async with self.inflight_guard():
            await asyncio.sleep(0.005)
            results = []
            for req in requests:
                token = "测试文本" if req["audio_bytes"] else ""
                results.append(
                    {
                        "partial_text": token,
                        "final_text": token if req["is_final"] else "",
                        "confidence": round(random.uniform(0.92, 0.99), 4),
                        "cache": req["cache"],
                        "vad_cache": req["vad_cache"],
                    }
                )
            return results

注意这里的 Semaphore。在大多数线上事故里,真正拖垮 GPU 服务的不是平均负载,而是瞬时 inflight 数暴涨导致的排队堆积和显存抖动。

6.6 结果下行:partial 与 final 必须具备幂等语义

客户端常见问题不是“收不到结果”,而是“收到一堆看不懂的中间态”。

建议返回统一结果格式:

{
  "session_id": "s-001",
  "sequence": 23,
  "result_type": "partial",
  "text": "今天上午十点",
  "confidence": 0.97,
  "is_final": false,
  "version": 23,
  "trace_id": "t-8f0f7f6c"
}

设计原则:

  • result_type 明确区分 partialfinal
  • version 单调递增,客户端据此做覆盖,不做拼接猜测。
  • trace_id 贯穿接入层、推理层、结果层,便于追踪问题。

6.7 离线修正 Worker:让最终文本更适合业务入库

在线结果是给用户看的,离线修正结果通常才是给系统消费的。

典型离线后处理包括:

  • 全句重新解码
  • 标点恢复
  • ITN 归一化
  • 热词纠偏
  • 数字、金额、时间表达标准化
  • 敏感词标注和业务实体抽取
class OfflinePostProcessor:
    def __init__(self, punc_model: Any, itn_converter: Any):
        self.punc_model = punc_model
        self.itn_converter = itn_converter

    async def finalize(self, raw_text: str, metadata: dict) -> dict:
        text = raw_text.strip()
        punctuated = self.punc_model.restore(text)
        normalized = self.itn_converter.normalize(punctuated)
        return {
            "raw_text": raw_text,
            "punctuated_text": punctuated,
            "normalized_text": normalized,
            "tenant_id": metadata["tenant_id"],
            "session_id": metadata["session_id"],
        }

生产实践里,离线修正最好走异步链路,而不是阻塞在线响应。


七、从单机到集群的关键工程能力补齐

7.1 高并发下的四级流控模型

要让系统扛住高并发,仅靠某一个“限流中间件”是不够的。推荐使用四级流控:

第一级:租户级连接配额

控制某个租户最多能建立多少活跃 session,防止单租户打爆集群。

第二级:网关级速率限制

控制单位时间内的 chunk 推送频率,避免异常客户端疯狂发送无效包。

第三级:队列级背压

当 Kafka lag 过高或内部队列超过阈值时,网关应主动返回“系统繁忙”或降低更新频率。

第四级:GPU inflight 闸门

真正进入推理层的并发数必须受控,这是保护显存和尾延迟的最后一道防线。

很多系统只做前两级,不做后两级,于是表面上“没有拒绝请求”,实际上是让所有请求一起变慢并超时。

7.2 批处理不是越大越好

ASR 推理批处理的目标是提高 GPU 吞吐,但流式场景必须防止为了等 batch 而牺牲实时性。

经验上建议:

  • 使用小批量微批处理,如 4、8、16。
  • 设置最大等待窗口,如 5ms、10ms、15ms。
  • 高优先级租户或低延迟场景可以绕过批处理直通。

一个常见错误是把离线批量推理的思路直接套到流式场景,结果 batch 大了,平均吞吐上去了,但首字延迟和 P99 全坏了。

7.3 模型与热词的缓存策略

在多租户场景中,热词和模型版本都可能动态变化。建议采用两级缓存:

  • 本地进程缓存:热点模型、热点热词词典,追求访问速度。
  • Redis 元数据缓存:词典版本号、租户配置、失效通知。

推荐做法:

  • 热词文件不要每个 chunk 都动态重载。
  • 使用版本号和懒加载机制,只在版本变更时刷新本地缓存。
  • 模型版本切换采用双缓冲,避免热更新期间服务中断。

7.4 显存管理与 OOM 防护

GPU 服务最怕的是“看起来还能跑,实际上已经濒临雪崩”。

建议至少实现以下机制:

  • 启动预热,避免流量首批触发大量显存分配。
  • 推理前并发闸门,硬性限制 inflight。
  • 活跃流数指标暴露给 HPA。
  • OOM 时快速失败并摘除实例,不要让实例进入假死状态。
  • 大模型和小模型分池部署,不混跑。

在很多生产环境里,真正稳定的策略不是“单实例吞吐拉满”,而是“每实例保留 20% 左右安全冗余”,换取更稳定的尾延迟。

7.5 客户端协议必须考虑弱网与断线重连

语音实时系统不是后端单方面的游戏。客户端如果没有设计好,服务端再强也会被拖累。

建议客户端协议至少支持:

  • chunk 序号
  • ack 或结果版本号
  • 心跳保活
  • 断线重连时携带最近已确认 sequence
  • 明确的 startdataend 生命周期事件

如果没有这些机制,弱网下常见问题包括:

  • 音频重复上传
  • 结果乱序覆盖
  • 会话明明结束了,服务端状态却回收不了

八、Kubernetes 生产部署:不仅能跑,还要能扩、能观测、能恢复

8.1 Streaming Worker Deployment

下面给出一个更贴近生产的 K8s 部署示例。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: funasr-streaming
  labels:
    app: funasr-streaming
spec:
  replicas: 4
  selector:
    matchLabels:
      app: funasr-streaming
  template:
    metadata:
      labels:
        app: funasr-streaming
    spec:
      nodeSelector:
        accelerator: nvidia-a10
      containers:
        - name: worker
          image: registry.example.com/asr/funasr-streaming:1.2.0
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 8080
          env:
            - name: REDIS_URL
              value: "redis://redis-cluster.default.svc:6379/0"
            - name: KAFKA_BROKERS
              value: "kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092"
            - name: MAX_INFLIGHT
              value: "32"
            - name: MAX_BATCH_SIZE
              value: "8"
            - name: BATCH_WAIT_MS
              value: "10"
          readinessProbe:
            httpGet:
              path: /readyz
              port: 8080
            initialDelaySeconds: 20
            periodSeconds: 5
          livenessProbe:
            httpGet:
              path: /healthz
              port: 8080
            initialDelaySeconds: 60
            periodSeconds: 10
          resources:
            requests:
              cpu: "4"
              memory: "12Gi"
              nvidia.com/gpu: "1"
            limits:
              cpu: "8"
              memory: "16Gi"
              nvidia.com/gpu: "1"
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh", "-c", "sleep 15"]
---
apiVersion: v1
kind: Service
metadata:
  name: funasr-streaming
spec:
  selector:
    app: funasr-streaming
  ports:
    - port: 80
      targetPort: 8080

这里有两个常被忽略的点:

  • preStop 给连接和队列消费留出优雅摘流时间。
  • readinessProbe 不应该只检查进程活着,还要检查模型是否预热完成、Kafka/Redis 是否可用。

8.2 HPA:不要只看 CPU

ASR 场景下,如果 HPA 只依据 CPU,会非常失真。因为瓶颈往往在:

  • 活跃流数
  • 队列积压
  • GPU inflight
  • P95 延迟

推荐自定义指标:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: funasr-streaming-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: funasr-streaming
  minReplicas: 4
  maxReplicas: 30
  metrics:
    - type: Pods
      pods:
        metric:
          name: funasr_active_streams
        target:
          type: AverageValue
          averageValue: "60"
    - type: Pods
      pods:
        metric:
          name: funasr_p95_latency_ms
        target:
          type: AverageValue
          averageValue: "800"

8.3 KEDA:用 Kafka lag 驱动离线修正扩容

在线链路和离线链路的扩缩容方式应该不同。离线链路更适合用消息积压驱动:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: funasr-offline-worker
spec:
  scaleTargetRef:
    name: funasr-offline-worker
  minReplicaCount: 1
  maxReplicaCount: 50
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: kafka.default.svc:9092
        topic: offline-finalize
        consumerGroup: offline-finalize-group
        lagThreshold: "200"

8.4 Ingress 与长连接配置

无论使用 Nginx Ingress 还是 Traefik,都要确保以下参数针对 WebSocket 做过调优:

  • 长连接超时
  • 读写超时
  • 最大请求体限制
  • TLS 终止位置
  • 空闲连接回收时间

如果这些默认值不改,线上经常会出现“模型没问题,但 WebSocket 莫名其妙断开”。


九、监控与可观测:不建立指标体系,就无法真正做优化

很多 ASR 系统上线后,只监控 CPU、内存和接口 QPS。这远远不够。

9.1 核心业务指标

建议至少监控以下指标:

指标 含义 价值
active_sessions 当前活跃会话数 衡量容量与租户占用
first_token_latency_ms 首字延迟 直接影响体验
final_latency_ms 最终结果延迟 影响字幕和质检入库
partial_update_interval_ms 中间结果刷新间隔 衡量流式平滑度
rtf 实时因子 小于 1 说明处理速度快于实时
queue_wait_ms 队列等待时间 定位高峰拥塞
gpu_inflight 正在占用 GPU 的推理数 保护显存和尾延迟
finalization_rate 最终结果完成率 衡量会话是否泄漏
hotword_hit_ratio 热词命中率 评估业务词优化效果

9.2 技术指标与资源指标

  • GPU 显存使用率
  • GPU SM 利用率
  • Pod 重启次数
  • Kafka lag
  • Redis RTT
  • 异常断连率
  • session 回收耗时

9.3 日志与链路追踪建议

每个 session 至少应带上:

  • trace_id
  • session_id
  • tenant_id
  • sequence
  • chunk_size
  • result_version
  • worker_id
  • model_version

一旦线上出现“某租户反馈识别慢”,你才能迅速判断是:

  • 客户端弱网
  • 网关限流
  • Kafka 积压
  • GPU 排队
  • VAD 阈值配置不合理
  • 某个模型版本异常

9.4 Prometheus 指标示例

from prometheus_client import Counter, Gauge, Histogram

ACTIVE_SESSIONS = Gauge("funasr_active_sessions", "current active sessions", ["tenant"])
FIRST_TOKEN_LATENCY = Histogram(
    "funasr_first_token_latency_ms",
    "first token latency in ms",
    buckets=(50, 100, 200, 300, 500, 800, 1200, 2000),
)
QUEUE_WAIT = Histogram(
    "funasr_queue_wait_ms",
    "queue wait time in ms",
    buckets=(1, 5, 10, 20, 50, 100, 200, 500),
)
GPU_INFLIGHT = Gauge("funasr_gpu_inflight", "inflight inference count")
FINAL_RESULTS = Counter("funasr_final_results_total", "final results count", ["tenant", "status"])

十、真实业务案例:客服质检平台如何从单机演进到集群

为了让方案更接地气,我们用一个典型场景来说明。

10.1 业务背景

某呼叫中心质检平台需要处理坐席和客户双声道语音,要求:

  • 实时展示通话字幕,便于质检席旁听
  • 通话结束后生成高质量转写文本
  • 自动识别违规话术、承诺用语、产品关键词
  • 工作日高峰并发 8000 路,低谷不足 500 路
  • 不同业务线拥有不同热词库和配额

10.2 初版架构为什么失败

团队最初采用“客户端直连 ASR 服务”的方式:

Browser -> Nginx -> FunASR Runtime

上线后出现了几个典型问题:

  • 高峰时单机 GPU 显存波动剧烈,频繁 OOM。
  • 同一通话在重连后被分到另一个实例,字幕回滚严重。
  • 通话结束后仍有大量 session 未清理,内存不断上涨。
  • 热词按请求实时加载,导致尾延迟明显上升。

10.3 改造方案

系统逐步演进为:

Client
 -> Gateway
 -> Kafka(audio-chunks, key=session_id)
 -> Streaming Worker
 -> Kafka(asr-results)
 -> Push Service / Storage
 -> Offline Finalize Worker

同时做了四项关键改造:

  • 热词由“每次请求加载”改为“租户级版本缓存 + 本地懒刷新”。
  • session 状态机标准化,最终结果后立即清理缓存。
  • 按租户做连接配额和优先级队列。
  • 监控从“接口耗时”升级为“首字延迟 + 队列等待 + inflight + lag”。

10.4 改造后的收益

在典型高峰窗口,系统观测到:

  • P95 首字延迟从 1.8s 降到 420ms。
  • P99 最终结果延迟从 4.7s 降到 1.3s。
  • GPU 平均利用率从 28% 提升到 63%。
  • OOM 事故基本消失。
  • 业务热词识别召回率提升约 11%。

更重要的是,系统从“出了问题靠人盯”变成了“可以被指标驱动治理”。


十一、百万并发设计方法:真正要扩的是接入、路由和治理,不只是推理节点

如果目标是百万级在线连接,建议把系统拆成四层容量模型。

11.1 第一层:连接层容量

关注

  • 长连接数
  • 心跳包频率
  • 网关内存占用
  • TLS 握手成本

优化方向

  • 将 WebSocket 接入层独立部署
  • 采用轻量连接状态
  • 对静音会话降低上传频率或边缘侧预抑制

11.2 第二层:消息层容量

关注

  • Kafka 分区数
  • 单分区吞吐
  • session key 分布均匀性
  • lag 与 rebalance 频率

优化方向

  • 提前规划分区数量
  • 按 session_id 做稳定哈希
  • 避免热点租户集中命中少量分区

11.3 第三层:推理层容量

关注

  • 每 GPU 可承载活跃流数
  • 平均 chunk 推理耗时
  • inflight 限制
  • 批处理效率

优化方向

  • 按模型规格分池
  • 小模型承接低优先级租户
  • 热路径、冷路径分离
  • 通过压测建立规格基线

11.4 第四层:治理层容量

关注

  • 多租户配额
  • 热词配置传播延迟
  • 灰度发布风险
  • 全链路告警与熔断

优化方向

  • 租户分级
  • 金丝雀发布
  • 按租户或模型版本做隔离池
  • 自动降级策略

所以,所谓“百万并发”的真正答案不是一句“多加几台 GPU”,而是:

把连接、消息、推理、治理拆开,各自独立扩容,各自建立容量模型。


十二、压测方法论:别让错误的压测结论误导架构决策

12.1 需要模拟的不是请求数,而是会话行为

流式语音的压测不能只发 HTTP 请求。你需要模拟:

  • 会话建立
  • chunk 连续上行
  • 不同讲话速率
  • 不同静音分布
  • 弱网和抖动
  • 断线重连

12.2 推荐压测指标

  • 单实例最大稳定活跃流数
  • 首字延迟 P50/P95/P99
  • 最终结果延迟 P95/P99
  • 队列等待时间
  • GPU 利用率与显存水位
  • 异常断开率
  • session 泄漏率

12.3 压测场景要覆盖峰谷和异常流量

建议至少做四类压测:

  1. 稳态压测:验证日常负载是否稳定。
  2. 突增压测:验证削峰与背压能力。
  3. 长稳压测:验证内存泄漏和 session 回收。
  4. 故障压测:验证实例重启、Kafka 抖动、Redis 超时后的恢复能力。

12.4 k6 + WebSocket 思路示例

import ws from "k6/ws";
import { check, sleep } from "k6";

export const options = {
  vus: 200,
  duration: "5m",
};

export default function () {
  const sessionId = `s-${__VU}-${Date.now()}`;
  const url = `wss://asr.example.com/ws/asr?tenant_id=test&session_id=${sessionId}`;

  const response = ws.connect(url, {}, function (socket) {
    socket.on("open", function () {
      for (let i = 0; i < 40; i++) {
        const fakeChunk = "0011223344556677";
        socket.sendBinary(hexToBytes(fakeChunk));
        sleep(0.1);
      }
      socket.send(JSON.stringify({ type: "end" }));
    });

    socket.on("message", function (_message) {});
    socket.on("close", function () {});
  });

  check(response, { "status is 101": (r) => r && r.status === 101 });
}

function hexToBytes(hex) {
  const bytes = new Uint8Array(hex.length / 2);
  for (let i = 0; i < hex.length; i += 2) {
    bytes[i / 2] = parseInt(hex.substring(i, i + 2), 16);
  }
  return bytes.buffer;
}

十三、生产落地清单:上线前必须逐项确认

为了方便工程团队直接执行,这里给出一份可操作的上线检查清单。

13.1 模型与推理

  • 是否完成模型预热?
  • 是否设置最大 inflight 限制?
  • 是否区分在线模型与离线修正模型?
  • 是否完成热词缓存设计,而非每次动态重载?

13.2 会话与协议

  • session_id 是否全链路唯一?
  • chunk 是否带 sequence?
  • 结果是否有 version,客户端是否按版本覆盖?
  • 会话结束后是否有显式 close/finalize 逻辑?

13.3 架构与基础设施

  • 是否保证 session 有序性?
  • 是否有削峰缓冲层?
  • 是否支持按租户限流和配额?
  • 是否具备自动扩缩容?

13.4 监控与告警

  • 是否监控首字延迟、最终延迟、队列等待、lag、inflight?
  • 是否有按租户、按模型版本的维度拆分?
  • 是否能追踪单个 session 的完整链路?

13.5 稳定性与容灾

  • Pod 重启是否优雅摘流?
  • Kafka 积压时是否有降级策略?
  • Redis 不可用时系统是否可退化运行?
  • OOM 后实例是否能被快速摘除并恢复?

十四、常见误区与反模式

最后再总结几个最常见、也最容易踩坑的反模式。

误区一:把流式识别当普通 HTTP 接口

流式识别是长连接、有状态、顺序敏感的系统,不是简单的请求响应服务。

误区二:用共享缓存承载高频张量状态

Redis 适合存元数据,不适合承载高频 cache 张量。高频推理状态应尽量留在 Worker 本地。

误区三:只追求单实例吞吐最大化

把单实例压到极限,通常意味着尾延迟、抖动和故障恢复能力都会变差。

误区四:只看平均延迟

语音场景真正影响体验的是 P95/P99 和首字延迟,而不是平均值。

误区五:在线链路和离线链路混在一起

实时展示和最终入库的优化目标不同,混在一起只会两边都做不好。


十五、结语:从“能识别”到“能运营”的最后一公里

FunASR 的价值,从来不只是“一个识别模型”,而是它为工业级语音场景提供了一个非常好的起点。真正决定项目成败的,不是你是否把 API 调通,而是你是否把以下几个问题做成系统能力:

  • 会话是否有序、状态是否可控
  • 延迟是否可拆解、可优化、可观测
  • 高峰流量是否可削峰、可背压、可降级
  • 多租户是否可隔离、可限流、可治理
  • 在线与离线是否分层,准确率与实时性是否各得其所

从单机推理到百万并发,真正的演进路径不是“把模型放大”,而是“把架构做对”。

当你把接入、路由、状态、推理、后处理、观测、调度、治理全部串起来之后,FunASR 才会从一个 Demo,真正成长为一套可以支撑业务规模化增长的生产级实时语音平台。

如果要把全文压缩成一句话,那就是:

生产级 FunASR 架构的本质,不是如何把一次推理跑起来,而是如何让海量实时会话,在可控成本下稳定、有序、低延迟地持续运行。

希望这篇文章的分享能为你的 FunASR 生产化实践提供清晰的路径。欢迎在云栈社区分享你的实践经验和遇到的挑战。




上一篇:Claude突设身份验证门槛,开发者AI编程选型标准重回“可用性”博弈
下一篇:Kafka架构实战:亿级实时语音事件流的分布式设计
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-18 21:18 , Processed in 0.867247 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表