当 Demo 能跑通时,我们解决的是“模型可用”;当系统要上线时,我们解决的是“业务可交付”;当并发、成本、稳定性和多租户一起压上来时,真正要回答的问题其实只有一个:怎样把 FunASR 从一个推理程序,演进成一个生产级实时语音平台。
本文不再停留于“如何启动一个容器”或“怎样调用一次 WebSocket 接口”,而是从架构师和一线工程人员的视角,系统回答以下四个问题:
- FunASR 流式识别到底是怎么工作的,为什么 chunk、cache、VAD、热词会直接影响延迟和准确率?
- 为什么单机部署一旦进入生产就容易出现排队、显存抖动、尾延迟飙升和会话乱序?
- 怎样从单机推理,演进到支持高并发、弹性扩缩容、会话有序、可观测、可治理的分布式系统?
- 怎样补齐生产代码、核心配置、监控指标、压测方法与真实业务案例,让文章不只“能看”,而且“能落地”?
如果你正在建设客服质检、会议纪要、语音机器人、车载语音、直播字幕、实时翻译等系统,这篇文章会更贴近真实场景。
一、为什么 FunASR Demo 在生产环境里容易失效
很多团队第一次接触 FunASR,路径都很相似:
- 下载模型。
- 跑通官方 Demo。
- 本地传一段音频,结果还不错。
- 上线后发现延迟、并发、稳定性、成本全都不对。
问题并不在于 FunASR 本身,而在于我们常常把“模型推理成功”误当成“系统工程成功”。
生产环境里的流式语音识别,至少要同时满足以下目标:
- 低延迟:首字延迟、句尾延迟、P95/P99 尾延迟可控。
- 高并发:成百上千甚至更多会话同时在线,且不能相互挤占导致雪崩。
- 高可用:节点宕机、网络抖动、客户端断线重连、消息重复投递都能兜住。
- 高精度:VAD、在线模型、离线二次修正、热词、标点、ITN 要协同工作。
- 低成本:GPU 利用率、实例规格、批处理策略、冷热分层要能持续优化。
- 可治理:多租户隔离、限流、配额、审计、灰度、监控、告警要完善。
真正把系统打崩的,通常不是“模型不准”,而是下面这些工程问题:
- 会话状态无法粘住:同一个 session 被负载均衡到了不同节点,导致 cache 失效,识别上下文断裂。
- 显存被长尾连接吃满:连接不断开、缓存不回收、batch 不受控,GPU 很快进入抖动状态。
- VAD 和流式状态机不稳定:讲话停顿稍长就误切句,造成在线结果频繁回滚。
- 同步串行链路过长:网关、鉴权、热词、VAD、ASR、标点全部串行,尾延迟迅速恶化。
- 高峰流量没有缓冲层:一波洪峰直接打到推理进程,队列积压和超时同时发生。
- 压测指标失真:只看平均 RT,不看首字延迟、实时因子 RTF、GPU SM 利用率和队列等待时间。
所以,本文的目标不是“教你启动 FunASR”,而是“教你构建一套生产级流式 ASR 体系”。
二、FunASR 流式识别的核心原理:不是调用接口这么简单
要做好架构,先要理解模型和流式机制本身。否则所有优化都只是“调参数碰运气”。
2.1 FunASR 的能力边界
从工程视角看,FunASR 不是单个模型,而是一个语音理解处理链:
- VAD:识别哪里是有效语音,哪里是静音。
- Online ASR:对持续输入的音频块进行流式识别,尽快给出中间结果。
- Offline ASR:在句子结束或完整片段到齐后,做高精度二次修正。
- Punctuation:给最终结果补标点。
- ITN:将文本从口语形态转换为标准书写形态,如“二零二六年四月十六日”转为“2026年4月16日”。
- Hotword:通过热词增强改善业务关键词、人名、地名、产品名的召回。
因此,生产系统优化的不只是 ASR 模型,而是整条链路的协同效率。
传统自回归 ASR 的核心问题是:每个 token 的生成依赖上一个 token,天然串行,延迟高,吞吐受限。
Paraformer 属于非自回归端到端模型,它通过 Predictor 先估计输出长度,再并行生成 token,从而显著降低解码串行开销。它之所以在工业场景有优势,关键在于三点:
- 推理更容易并行化,吞吐更高。
- 延迟与输出长度耦合更弱,更适合实时字幕和语音机器人。
- 更适合与 chunk cache 配合,构建流式增量推理链路。
可以把传统自回归理解成“边写边想”,把 Paraformer 理解成“先想结构,再快速落笔”。前者更像逐字生成,后者更像并行补全。
2.3 流式识别为什么必须使用 chunk
真实音频不是完整文件一次性提交,而是按时间不断到达。系统不可能等用户说完整句话再识别,否则首字延迟会极差。
因此,流式识别一定要把音频切成 chunk:
- 每个 chunk 是一个很小的时间片,如 60ms、100ms、200ms。
- 模型对每个 chunk 做增量计算,而不是每次重算全部历史。
- 系统在 chunk 之间维护上下文缓存,保证跨块语义连续。
chunk 的大小决定了两个核心指标:
- chunk 越小:首字更快,但 CPU/GPU 调度更频繁,系统开销更高,精度也可能下降。
- chunk 越大:吞吐更稳、精度更高,但实时性变差,首字延迟会上升。
在工程中,chunk 不是越小越好,而是要结合业务 SLO 找平衡:
- 实时字幕:更关注首字延迟,chunk 往往偏小。
- 客服质检:更关注稳定性和成本,chunk 可以略大。
- 车载场景:既要快,又要抗抖动,通常需要自适应策略。
2.4 cache 是流式推理性能的生命线
如果没有 cache,每来一个 chunk 都从头计算历史音频,复杂度会迅速失控。
FunASR 流式识别的关键机制之一,就是在 session 级别维护 cache:
- encoder cache:缓存历史声学特征编码状态。
- decoder/predictor cache:缓存历史上下文或对齐信息。
- vad state:缓存静音、讲话段边界状态。
- partial text state:缓存当前句的中间识别文本,供客户端增量展示。
这意味着一个非常重要的架构约束:
同一个 session 的 chunk,必须按顺序进入同一个状态机实例。
如果你让同一会话的 chunk 在多个 Pod 之间随机漂移,缓存就无法复用,轻则延迟升高,重则结果错乱。
2.5 VAD 为什么常常是延迟和体验的真正决定因素
很多团队一上来只盯着 ASR 推理耗时,却忽略了 VAD 对用户体验的影响更大。
一个典型链路的延迟可拆分为:
| 环节 |
典型耗时 |
说明 |
| 音频采集与上传 |
20ms - 150ms |
与客户端实现、网络质量相关 |
| 网关接入与鉴权 |
5ms - 30ms |
取决于是否有额外业务逻辑 |
| 队列等待 |
0ms - 数百 ms |
高峰期最容易失控 |
| VAD 判定 |
50ms - 400ms |
句尾等待尤其关键 |
| 在线 ASR 推理 |
20ms - 120ms |
与模型、batch、GPU 相关 |
| 标点/ITN/后处理 |
5ms - 50ms |
往往被低估 |
| 结果回传 |
10ms - 80ms |
与网络和订阅方式相关 |
很多时候,用户觉得“字幕慢”,并不是 GPU 慢,而是:
- VAD 在等更长的静音窗口确认句尾。
- 网关把请求排进了内部队列。
- 在线结果被设计成“频繁回滚”,导致客户端观感很差。
因此,生产级优化必须区分三类延迟:
- 首字延迟 First Token Latency
- 中间增量更新间隔 Incremental Update Interval
- 最终结果确认延迟 Finalization Latency
2.6 2-Pass 架构为什么是工业实践中的常态
纯在线识别的优势是快,但通常不如完整上下文下的离线识别稳定。纯离线识别的优势是准,但体验太差。
所以工业系统通常采用 2-Pass:
- 第一阶段:在线流式模型输出低延迟中间结果。
- 第二阶段:句子结束后使用完整上下文做离线修正,补齐标点、ITN、热词校正。
这也是为什么生产系统里往往会同时存在两条路径:
- 热路径:面向用户实时回显,追求快。
- 冷路径:面向最终文本入库、质检、检索和摘要,追求准。
如果你把这两条路径混在一起,就很容易出现“为了更准把实时性拖死”或者“为了更快导致入库文本不可用”的问题。
三、从单机到平台:FunASR 生产架构的五个演进阶段
生产架构的演进,本质上是不断解决新的系统瓶颈。下面给出一个更贴近现实的五阶段模型。
3.1 阶段 0:单进程原型
最简形式通常是:
Client -> WebSocket Server -> FunASR Model -> Result
这个阶段适合做:
- 模型效果验证
- 客户端协议联调
- 热词机制和结果格式设计
但它的问题也很明显:
- 无法隔离接入层和推理层
- 没有水平扩展能力
- 连接数稍高就会把进程拖垮
- 没有会话治理、熔断、限流、监控
3.2 阶段 1:单机多进程 + Nginx 接入
Client -> Nginx -> ASR Worker 1..N -> Local Model Cache
这一步解决的是基本接入问题:
- 用 Nginx 支持 WebSocket 升级
- 用 supervisor/systemd 管理多进程
- 用本地共享模型目录减少重复加载
适用场景:
瓶颈:
- 依然是单机资源池
- GPU 无法灵活切分和调度
- 重启、发布、扩容都依赖单点主机
3.3 阶段 2:接入层与推理层分离
Client -> Gateway -> Session Router -> ASR Pods
这一步是体系化建设的起点。你不再让客户端直连模型服务,而是引入网关和会话路由层。
职责拆分如下:
- Gateway:鉴权、租户识别、协议转换、限流、心跳、断连处理。
- Session Router:确保同一个 session 的 chunk 有序地进入同一条处理链。
- ASR Pods:只负责推理,不处理复杂业务状态。
这一步的核心价值是:
- 接入复杂度与推理复杂度解耦
- 多租户和安全控制有了承载点
- 后续引入 Kafka、Redis、K8s 都有了结构基础
3.4 阶段 3:事件驱动化,加入 Kafka 与 Redis
Client -> Gateway -> Kafka(audio-chunks)
-> Streaming Workers
-> Kafka(asr-results)
-> Push Service
这是从“同步调用系统”走向“流式平台”的关键一步。
为什么一定要引入消息队列?
- 用于削峰填谷,隔离突发洪峰。
- 用于显式背压,防止把推理进程直接打爆。
- 用于解耦热路径与冷路径。
- 用于审计与回放,便于问题排查和离线重算。
Redis 则主要承担三类状态:
- session 状态:cache 索引、最后活跃时间、租户上下文。
- 热词配置:租户级和会话级热词动态加载。
- 防重与分布式锁:避免重复 close、重复 finalization。
这一阶段,系统第一次具备“像平台一样运行”的基础。
3.5 阶段 4:Kubernetes + HPA/KEDA 弹性调度
当日常并发达到几百到几千路,且不同租户的峰谷差异显著时,静态部署已经无法承载。
Kubernetes 的价值在于:
- 把 GPU 资源池化
- 通过 HPA 根据活跃流数、CPU、内存等扩缩容
- 通过 KEDA 根据 Kafka lag 自动拉起离线修正 Worker
- 配合 Ingress、Service Mesh、Prometheus 构建治理体系
这一步不仅是“自动扩容”,更重要的是把“算力调度”变成系统能力。
3.6 阶段 5:百万并发平台的本质,不是百万路 GPU 直连
很多文章写“百万并发”,喜欢直接给出一个宏大标题,但没有解释系统语义。严格来说,“百万并发”并不等于“百万路音频实时在 GPU 上同时解码”。
在实时语音平台里,百万并发往往意味着:
- 百万级长连接在线
- 数十万级会话处于活跃或准活跃状态
- 某个时间窗口内,真正进入 ASR 推理层的有效流量远低于连接数
- 通过多级缓冲、静音抑制、边缘预处理、热冷分层和租户配额,系统才有可能在成本可接受的情况下承载海量连接
换句话说,百万并发平台拼的不是“模型本身”,而是 接入架构、流控机制、状态治理、异步化和资源调度。
四、生产级总体架构:控制面与数据面分离
要把系统做大,最重要的架构升级之一,就是把控制面和数据面分开。
4.1 数据面:承载高频音频流
数据面负责真正的高频实时请求:
- WebSocket/WebRTC 音频上行
- chunk 编排
- session route
- streaming inference
- partial/final result push
数据面的要求是:
- 低延迟
- 少依赖
- 最少业务逻辑
- 避免同步调用外部系统
4.2 控制面:负责治理与配置
控制面负责低频但复杂的操作:
- 租户配置
- 热词管理
- 配额和限流策略
- 模型版本与灰度发布
- 告警阈值和监控配置
- 审计和追踪
控制面变化频率低,但逻辑复杂。把它混进数据面,只会让实时链路变脆弱。
4.3 推荐总体架构
+-------------------------+
| Control Plane |
| tenant, quota, hotword |
| config, model release |
+-----------+-------------+
|
v
+------------+ +---------------------------+ +----------------------+
| Clients | -> | Gateway / Session Router | ---> | Kafka: audio-chunks |
| ws/webrtc | | auth, route, rate-limit | +----------+-----------+
+------------+ +---------------------------+ |
v
+--------------------------+
| Streaming ASR Workers |
| session cache, VAD, ASR |
+------------+-------------+
|
+---------------------------+---------+--------------------+
| | |
v v v
+-------------------+ +-------------------------+ +-------------------+
| Redis Cluster | | Kafka: asr-results | | Trace / Metrics |
| session/hotwords | | partial/final results | | logs/rtf/latency |
+-------------------+ +------------+------------+ +-------------------+
|
v
+--------------------------+
| Push Service / Consumers |
| realtime push / storage |
+-------------+------------+
|
v
+--------------------------+
| Offline 2-Pass Workers |
| punc, ITN, correction |
+--------------------------+
这个架构的关键点有三个:
- 音频流和控制配置解耦。
- 实时在线识别和离线修正解耦。
- session 有序性和全局扩展性同时满足。
五、会话有序性是架构成败的分水岭
流式 ASR 和普通 HTTP 服务最大的区别,就是它是 有状态流处理。
5.1 为什么必须保证 session stickiness
一个流式会话包含:
- 当前音频偏移量
- 上下文 cache
- VAD 状态
- 当前句 partial text
- 已加载的租户热词
- 最近一次结果序号
如果一个 session 的 chunk 被轮询到不同实例,会出现:
- cache 丢失,推理耗时急剧上升
- 句子切分失真
- partial result 回退异常
- final result 与 partial result 无法对应
5.2 三种常见实现方式
方案一:网关本地一致性哈希
优点:
缺点:
适合中小规模集群。
方案二:Kafka 按 session_id 分区
优点:
- 天然保证同一 session 顺序
- 容易做回放和异步消费
缺点:
适合生产级分布式系统。
方案三:独立 Session Router
优点:
缺点:
适合多租户大规模平台。
工程实践里,最常见的稳定组合是:
Gateway 做接入和轻路由,Kafka 按 session_id 分区保证有序,Worker 持有本地 cache,Redis 只存索引和元数据,不存高频中间张量。
这个边界很重要。不要试图把模型 cache 序列化后频繁写 Redis,那样会把低延迟系统做成高延迟系统。
六、生产级代码实战:从接入到推理的完整骨架
下面给出一套更接近生产系统的代码结构。示例以 Python 为主,重点展示架构思路,而不是依赖某个特定 SDK 版本。
6.1 目录结构建议
funasr-platform/
├── app/
│ ├── gateway/
│ │ ├── ws_server.py
│ │ ├── auth.py
│ │ └── limiter.py
│ ├── router/
│ │ └── session_router.py
│ ├── worker/
│ │ ├── stream_worker.py
│ │ ├── model_pool.py
│ │ ├── vad_state.py
│ │ └── postprocess.py
│ ├── infra/
│ │ ├── kafka_bus.py
│ │ ├── redis_repo.py
│ │ ├── metrics.py
│ │ └── tracing.py
│ └── domain/
│ ├── session.py
│ ├── events.py
│ └── result.py
├── deploy/
│ ├── docker-compose.yml
│ ├── k8s/
│ └── nginx/
└── tests/
├── load/
└── integration/
6.2 网关层:负责接入、鉴权、限流、协议校验
网关层不要直接持有模型。它应该只做四件事:
- 识别租户和用户身份
- 生成或校验 session_id
- 做速率限制和连接配额
- 把 chunk 封装成事件推送给后端
import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from typing import Optional
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from redis.asyncio import Redis
app = FastAPI()
@dataclass
class AudioChunkEvent:
session_id: str
tenant_id: str
sequence: int
audio_bytes: bytes
is_final: bool
send_ts: float
hotwords: list[str]
class ConnectionLimiter:
def __init__(self, redis: Redis, tenant_limit: int = 2000):
self.redis = redis
self.tenant_limit = tenant_limit
async def acquire(self, tenant_id: str, session_id: str) -> bool:
key = f"tenant:{tenant_id}:active_sessions"
value = await self.redis.scard(key)
if value >= self.tenant_limit:
return False
await self.redis.sadd(key, session_id)
await self.redis.expire(key, 3600)
return True
async def release(self, tenant_id: str, session_id: str) -> None:
await self.redis.srem(f"tenant:{tenant_id}:active_sessions", session_id)
class KafkaProducerAdapter:
async def send_audio_chunk(self, event: AudioChunkEvent) -> None:
# 实际环境中应使用 aiokafka,key=session_id 保证同分区顺序
payload = {
"session_id": event.session_id,
"tenant_id": event.tenant_id,
"sequence": event.sequence,
"audio_bytes": event.audio_bytes.hex(),
"is_final": event.is_final,
"send_ts": event.send_ts,
"hotwords": event.hotwords,
}
_ = json.dumps(payload)
redis_client = Redis.from_url("redis://localhost:6379/0", decode_responses=True)
limiter = ConnectionLimiter(redis_client)
producer = KafkaProducerAdapter()
async def authenticate(ws: WebSocket) -> tuple[str, str]:
token = ws.query_params.get("token", "")
tenant_id = ws.query_params.get("tenant_id", "default")
user_id = "anonymous" if not token else f"user-{token[-6:]}"
return tenant_id, user_id
@app.websocket("/ws/asr")
async def asr_websocket(ws: WebSocket):
await ws.accept()
tenant_id, user_id = await authenticate(ws)
session_id = ws.query_params.get("session_id") or str(uuid.uuid4())
hotwords = [w for w in ws.query_params.get("hotwords", "").split(",") if w]
acquired = await limiter.acquire(tenant_id, session_id)
if not acquired:
await ws.send_json({"code": 429, "message": "tenant session quota exceeded"})
await ws.close()
return
sequence = 0
try:
await ws.send_json(
{
"code": 0,
"message": "connected",
"session_id": session_id,
"user_id": user_id,
}
)
while True:
message = await ws.receive()
if "bytes" in message and message["bytes"] is not None:
sequence += 1
await producer.send_audio_chunk(
AudioChunkEvent(
session_id=session_id,
tenant_id=tenant_id,
sequence=sequence,
audio_bytes=message["bytes"],
is_final=False,
send_ts=time.time(),
hotwords=hotwords,
)
)
elif "text" in message and message["text"] is not None:
payload = json.loads(message["text"])
if payload.get("type") == "end":
sequence += 1
await producer.send_audio_chunk(
AudioChunkEvent(
session_id=session_id,
tenant_id=tenant_id,
sequence=sequence,
audio_bytes=b"",
is_final=True,
send_ts=time.time(),
hotwords=hotwords,
)
)
await ws.send_json({"code": 0, "message": "closing"})
break
except WebSocketDisconnect:
pass
finally:
await limiter.release(tenant_id, session_id)
await ws.close()
这段代码体现了三个生产级要点:
- 连接配额按租户控制,而不是所有请求共用一个粗暴限流。
- 音频上行和模型推理解耦,通过事件总线处理。
session_id + sequence 是后续结果去重、重排、回放的基础。
6.3 领域模型:显式表示会话状态
很多系统稳定性差,不是因为模型,而是因为状态都散落在各种 dict 里,没人知道一个 session 当前到底处于什么阶段。
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
import time
class SessionStage(str, Enum):
CREATED = "created"
STREAMING = "streaming"
FINALIZING = "finalizing"
CLOSED = "closed"
@dataclass
class SessionState:
session_id: str
tenant_id: str
stage: SessionStage = SessionStage.CREATED
last_sequence: int = 0
partial_text: str = ""
final_text: str = ""
vad_cache: dict[str, Any] = field(default_factory=dict)
asr_cache: dict[str, Any] = field(default_factory=dict)
hotwords: list[str] = field(default_factory=list)
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
def touch(self) -> None:
self.updated_at = time.time()
def validate_sequence(self, sequence: int) -> bool:
return sequence == self.last_sequence + 1
def mark_streaming(self, sequence: int) -> None:
self.stage = SessionStage.STREAMING
self.last_sequence = sequence
self.touch()
def mark_finalizing(self, sequence: int) -> None:
self.stage = SessionStage.FINALIZING
self.last_sequence = sequence
self.touch()
def mark_closed(self) -> None:
self.stage = SessionStage.CLOSED
self.touch()
生产系统里,显式状态机会带来三个收益:
- 出问题时可以定位 session 卡在哪个阶段。
- 重试、幂等、回收逻辑有统一边界。
- 监控指标可以直接从状态机导出。
6.4 推理 Worker:把“单模型调用”升级为“可批处理的状态机执行器”
真正的难点在这里。流式 Worker 不只是接个 chunk 调一次模型,而是要兼顾:
- 按 session 顺序执行
- 维护 cache
- 控制 batch 规模
- 在低延迟和高吞吐之间动态折中
下面给出一个可落地的执行骨架。
import asyncio
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any
@dataclass
class DecodeOutput:
session_id: str
sequence: int
text: str
is_final: bool
confidence: float
latency_ms: int
class ModelPool:
def __init__(self, model_manager: Any, max_batch_size: int = 8, batch_wait_ms: int = 15):
self.model_manager = model_manager
self.max_batch_size = max_batch_size
self.batch_wait_ms = batch_wait_ms
self.pending: list[tuple[dict, asyncio.Future]] = []
self.lock = asyncio.Lock()
self.loop_task = asyncio.create_task(self._batch_loop())
async def infer(self, request: dict) -> dict:
future = asyncio.get_running_loop().create_future()
async with self.lock:
self.pending.append((request, future))
return await future
async def _batch_loop(self) -> None:
while True:
await asyncio.sleep(self.batch_wait_ms / 1000.0)
async with self.lock:
if not self.pending:
continue
batch = self.pending[: self.max_batch_size]
self.pending = self.pending[self.max_batch_size :]
requests = [item[0] for item in batch]
futures = [item[1] for item in batch]
try:
results = await self.model_manager.batch_generate(requests)
for future, result in zip(futures, results):
if not future.done():
future.set_result(result)
except Exception as exc:
for future in futures:
if not future.done():
future.set_exception(exc)
class SessionRepository:
def __init__(self):
self.sessions: dict[str, SessionState] = {}
def get_or_create(self, session_id: str, tenant_id: str) -> SessionState:
state = self.sessions.get(session_id)
if state is None:
state = SessionState(session_id=session_id, tenant_id=tenant_id)
self.sessions[session_id] = state
return state
def delete(self, session_id: str) -> None:
self.sessions.pop(session_id, None)
class StreamingWorker:
def __init__(self, model_pool: ModelPool, repository: SessionRepository):
self.model_pool = model_pool
self.repository = repository
self.session_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
async def handle_chunk(self, event: dict) -> DecodeOutput:
session_id = event["session_id"]
tenant_id = event["tenant_id"]
sequence = event["sequence"]
start = time.perf_counter()
async with self.session_locks[session_id]:
state = self.repository.get_or_create(session_id, tenant_id)
if not state.validate_sequence(sequence):
raise ValueError(
f"out-of-order chunk: session={session_id}, expected={state.last_sequence + 1}, got={sequence}"
)
request = {
"audio_bytes": bytes.fromhex(event["audio_bytes"]),
"cache": state.asr_cache,
"vad_cache": state.vad_cache,
"hotwords": event.get("hotwords", state.hotwords),
"is_final": event["is_final"],
}
if event["is_final"]:
state.mark_finalizing(sequence)
else:
state.mark_streaming(sequence)
result = await self.model_pool.infer(request)
state.partial_text = result.get("partial_text", state.partial_text)
if result.get("final_text"):
state.final_text = result["final_text"]
if result.get("cache") is not None:
state.asr_cache = result["cache"]
if result.get("vad_cache") is not None:
state.vad_cache = result["vad_cache"]
latency_ms = int((time.perf_counter() - start) * 1000)
is_final = bool(event["is_final"] or result.get("final_text"))
output = DecodeOutput(
session_id=session_id,
sequence=sequence,
text=state.final_text if is_final else state.partial_text,
is_final=is_final,
confidence=float(result.get("confidence", 0.0)),
latency_ms=latency_ms,
)
if is_final:
state.mark_closed()
self.repository.delete(session_id)
return output
这段代码体现了四个关键设计:
- 每个 session 内串行,多个 session 间并发。
- 通过微批处理提升 GPU 吞吐。
- cache 与状态在 Worker 本地内存中维护,减少远程访问开销。
- final 时及时回收状态,避免长尾连接挤爆内存。
6.5 模型管理器:预热、限流、降级缺一不可
在生产环境里,模型管理器不能只是“加载模型”。
它至少要具备:
- 预热:避免第一批请求触发冷启动。
- 并发闸门:限制同时进入 GPU 的推理数。
- 降级:GPU 紧张时,允许部分租户切到 CPU 小模型或转离线。
- 健康探测:连续错误时主动摘除实例。
import asyncio
import random
from contextlib import asynccontextmanager
class FunASRModelManager:
def __init__(self, online_model: Any, max_inflight: int = 32):
self.online_model = online_model
self.semaphore = asyncio.Semaphore(max_inflight)
self.ready = False
async def warmup(self) -> None:
dummy = {
"audio_bytes": b"\x00" * 3200,
"cache": {},
"vad_cache": {},
"hotwords": [],
"is_final": False,
}
for _ in range(3):
await self.batch_generate([dummy])
self.ready = True
@asynccontextmanager
async def inflight_guard(self):
await self.semaphore.acquire()
try:
yield
finally:
self.semaphore.release()
async def batch_generate(self, requests: list[dict]) -> list[dict]:
async with self.inflight_guard():
await asyncio.sleep(0.005)
results = []
for req in requests:
token = "测试文本" if req["audio_bytes"] else ""
results.append(
{
"partial_text": token,
"final_text": token if req["is_final"] else "",
"confidence": round(random.uniform(0.92, 0.99), 4),
"cache": req["cache"],
"vad_cache": req["vad_cache"],
}
)
return results
注意这里的 Semaphore。在大多数线上事故里,真正拖垮 GPU 服务的不是平均负载,而是瞬时 inflight 数暴涨导致的排队堆积和显存抖动。
6.6 结果下行:partial 与 final 必须具备幂等语义
客户端常见问题不是“收不到结果”,而是“收到一堆看不懂的中间态”。
建议返回统一结果格式:
{
"session_id": "s-001",
"sequence": 23,
"result_type": "partial",
"text": "今天上午十点",
"confidence": 0.97,
"is_final": false,
"version": 23,
"trace_id": "t-8f0f7f6c"
}
设计原则:
result_type 明确区分 partial 和 final。
version 单调递增,客户端据此做覆盖,不做拼接猜测。
trace_id 贯穿接入层、推理层、结果层,便于追踪问题。
6.7 离线修正 Worker:让最终文本更适合业务入库
在线结果是给用户看的,离线修正结果通常才是给系统消费的。
典型离线后处理包括:
- 全句重新解码
- 标点恢复
- ITN 归一化
- 热词纠偏
- 数字、金额、时间表达标准化
- 敏感词标注和业务实体抽取
class OfflinePostProcessor:
def __init__(self, punc_model: Any, itn_converter: Any):
self.punc_model = punc_model
self.itn_converter = itn_converter
async def finalize(self, raw_text: str, metadata: dict) -> dict:
text = raw_text.strip()
punctuated = self.punc_model.restore(text)
normalized = self.itn_converter.normalize(punctuated)
return {
"raw_text": raw_text,
"punctuated_text": punctuated,
"normalized_text": normalized,
"tenant_id": metadata["tenant_id"],
"session_id": metadata["session_id"],
}
生产实践里,离线修正最好走异步链路,而不是阻塞在线响应。
七、从单机到集群的关键工程能力补齐
7.1 高并发下的四级流控模型
要让系统扛住高并发,仅靠某一个“限流中间件”是不够的。推荐使用四级流控:
第一级:租户级连接配额
控制某个租户最多能建立多少活跃 session,防止单租户打爆集群。
第二级:网关级速率限制
控制单位时间内的 chunk 推送频率,避免异常客户端疯狂发送无效包。
第三级:队列级背压
当 Kafka lag 过高或内部队列超过阈值时,网关应主动返回“系统繁忙”或降低更新频率。
第四级:GPU inflight 闸门
真正进入推理层的并发数必须受控,这是保护显存和尾延迟的最后一道防线。
很多系统只做前两级,不做后两级,于是表面上“没有拒绝请求”,实际上是让所有请求一起变慢并超时。
7.2 批处理不是越大越好
ASR 推理批处理的目标是提高 GPU 吞吐,但流式场景必须防止为了等 batch 而牺牲实时性。
经验上建议:
- 使用小批量微批处理,如 4、8、16。
- 设置最大等待窗口,如 5ms、10ms、15ms。
- 高优先级租户或低延迟场景可以绕过批处理直通。
一个常见错误是把离线批量推理的思路直接套到流式场景,结果 batch 大了,平均吞吐上去了,但首字延迟和 P99 全坏了。
7.3 模型与热词的缓存策略
在多租户场景中,热词和模型版本都可能动态变化。建议采用两级缓存:
- 本地进程缓存:热点模型、热点热词词典,追求访问速度。
- Redis 元数据缓存:词典版本号、租户配置、失效通知。
推荐做法:
- 热词文件不要每个 chunk 都动态重载。
- 使用版本号和懒加载机制,只在版本变更时刷新本地缓存。
- 模型版本切换采用双缓冲,避免热更新期间服务中断。
7.4 显存管理与 OOM 防护
GPU 服务最怕的是“看起来还能跑,实际上已经濒临雪崩”。
建议至少实现以下机制:
- 启动预热,避免流量首批触发大量显存分配。
- 推理前并发闸门,硬性限制 inflight。
- 活跃流数指标暴露给 HPA。
- OOM 时快速失败并摘除实例,不要让实例进入假死状态。
- 大模型和小模型分池部署,不混跑。
在很多生产环境里,真正稳定的策略不是“单实例吞吐拉满”,而是“每实例保留 20% 左右安全冗余”,换取更稳定的尾延迟。
7.5 客户端协议必须考虑弱网与断线重连
语音实时系统不是后端单方面的游戏。客户端如果没有设计好,服务端再强也会被拖累。
建议客户端协议至少支持:
- chunk 序号
- ack 或结果版本号
- 心跳保活
- 断线重连时携带最近已确认 sequence
- 明确的
start、data、end 生命周期事件
如果没有这些机制,弱网下常见问题包括:
- 音频重复上传
- 结果乱序覆盖
- 会话明明结束了,服务端状态却回收不了
八、Kubernetes 生产部署:不仅能跑,还要能扩、能观测、能恢复
8.1 Streaming Worker Deployment
下面给出一个更贴近生产的 K8s 部署示例。
apiVersion: apps/v1
kind: Deployment
metadata:
name: funasr-streaming
labels:
app: funasr-streaming
spec:
replicas: 4
selector:
matchLabels:
app: funasr-streaming
template:
metadata:
labels:
app: funasr-streaming
spec:
nodeSelector:
accelerator: nvidia-a10
containers:
- name: worker
image: registry.example.com/asr/funasr-streaming:1.2.0
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8080
env:
- name: REDIS_URL
value: "redis://redis-cluster.default.svc:6379/0"
- name: KAFKA_BROKERS
value: "kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092"
- name: MAX_INFLIGHT
value: "32"
- name: MAX_BATCH_SIZE
value: "8"
- name: BATCH_WAIT_MS
value: "10"
readinessProbe:
httpGet:
path: /readyz
port: 8080
initialDelaySeconds: 20
periodSeconds: 5
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 60
periodSeconds: 10
resources:
requests:
cpu: "4"
memory: "12Gi"
nvidia.com/gpu: "1"
limits:
cpu: "8"
memory: "16Gi"
nvidia.com/gpu: "1"
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 15"]
---
apiVersion: v1
kind: Service
metadata:
name: funasr-streaming
spec:
selector:
app: funasr-streaming
ports:
- port: 80
targetPort: 8080
这里有两个常被忽略的点:
preStop 给连接和队列消费留出优雅摘流时间。
readinessProbe 不应该只检查进程活着,还要检查模型是否预热完成、Kafka/Redis 是否可用。
8.2 HPA:不要只看 CPU
ASR 场景下,如果 HPA 只依据 CPU,会非常失真。因为瓶颈往往在:
- 活跃流数
- 队列积压
- GPU inflight
- P95 延迟
推荐自定义指标:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: funasr-streaming-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: funasr-streaming
minReplicas: 4
maxReplicas: 30
metrics:
- type: Pods
pods:
metric:
name: funasr_active_streams
target:
type: AverageValue
averageValue: "60"
- type: Pods
pods:
metric:
name: funasr_p95_latency_ms
target:
type: AverageValue
averageValue: "800"
8.3 KEDA:用 Kafka lag 驱动离线修正扩容
在线链路和离线链路的扩缩容方式应该不同。离线链路更适合用消息积压驱动:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: funasr-offline-worker
spec:
scaleTargetRef:
name: funasr-offline-worker
minReplicaCount: 1
maxReplicaCount: 50
triggers:
- type: kafka
metadata:
bootstrapServers: kafka.default.svc:9092
topic: offline-finalize
consumerGroup: offline-finalize-group
lagThreshold: "200"
8.4 Ingress 与长连接配置
无论使用 Nginx Ingress 还是 Traefik,都要确保以下参数针对 WebSocket 做过调优:
- 长连接超时
- 读写超时
- 最大请求体限制
- TLS 终止位置
- 空闲连接回收时间
如果这些默认值不改,线上经常会出现“模型没问题,但 WebSocket 莫名其妙断开”。
九、监控与可观测:不建立指标体系,就无法真正做优化
很多 ASR 系统上线后,只监控 CPU、内存和接口 QPS。这远远不够。
9.1 核心业务指标
建议至少监控以下指标:
| 指标 |
含义 |
价值 |
| active_sessions |
当前活跃会话数 |
衡量容量与租户占用 |
| first_token_latency_ms |
首字延迟 |
直接影响体验 |
| final_latency_ms |
最终结果延迟 |
影响字幕和质检入库 |
| partial_update_interval_ms |
中间结果刷新间隔 |
衡量流式平滑度 |
| rtf |
实时因子 |
小于 1 说明处理速度快于实时 |
| queue_wait_ms |
队列等待时间 |
定位高峰拥塞 |
| gpu_inflight |
正在占用 GPU 的推理数 |
保护显存和尾延迟 |
| finalization_rate |
最终结果完成率 |
衡量会话是否泄漏 |
| hotword_hit_ratio |
热词命中率 |
评估业务词优化效果 |
9.2 技术指标与资源指标
- GPU 显存使用率
- GPU SM 利用率
- Pod 重启次数
- Kafka lag
- Redis RTT
- 异常断连率
- session 回收耗时
9.3 日志与链路追踪建议
每个 session 至少应带上:
- trace_id
- session_id
- tenant_id
- sequence
- chunk_size
- result_version
- worker_id
- model_version
一旦线上出现“某租户反馈识别慢”,你才能迅速判断是:
- 客户端弱网
- 网关限流
- Kafka 积压
- GPU 排队
- VAD 阈值配置不合理
- 某个模型版本异常
9.4 Prometheus 指标示例
from prometheus_client import Counter, Gauge, Histogram
ACTIVE_SESSIONS = Gauge("funasr_active_sessions", "current active sessions", ["tenant"])
FIRST_TOKEN_LATENCY = Histogram(
"funasr_first_token_latency_ms",
"first token latency in ms",
buckets=(50, 100, 200, 300, 500, 800, 1200, 2000),
)
QUEUE_WAIT = Histogram(
"funasr_queue_wait_ms",
"queue wait time in ms",
buckets=(1, 5, 10, 20, 50, 100, 200, 500),
)
GPU_INFLIGHT = Gauge("funasr_gpu_inflight", "inflight inference count")
FINAL_RESULTS = Counter("funasr_final_results_total", "final results count", ["tenant", "status"])
十、真实业务案例:客服质检平台如何从单机演进到集群
为了让方案更接地气,我们用一个典型场景来说明。
10.1 业务背景
某呼叫中心质检平台需要处理坐席和客户双声道语音,要求:
- 实时展示通话字幕,便于质检席旁听
- 通话结束后生成高质量转写文本
- 自动识别违规话术、承诺用语、产品关键词
- 工作日高峰并发 8000 路,低谷不足 500 路
- 不同业务线拥有不同热词库和配额
10.2 初版架构为什么失败
团队最初采用“客户端直连 ASR 服务”的方式:
Browser -> Nginx -> FunASR Runtime
上线后出现了几个典型问题:
- 高峰时单机 GPU 显存波动剧烈,频繁 OOM。
- 同一通话在重连后被分到另一个实例,字幕回滚严重。
- 通话结束后仍有大量 session 未清理,内存不断上涨。
- 热词按请求实时加载,导致尾延迟明显上升。
10.3 改造方案
系统逐步演进为:
Client
-> Gateway
-> Kafka(audio-chunks, key=session_id)
-> Streaming Worker
-> Kafka(asr-results)
-> Push Service / Storage
-> Offline Finalize Worker
同时做了四项关键改造:
- 热词由“每次请求加载”改为“租户级版本缓存 + 本地懒刷新”。
- session 状态机标准化,最终结果后立即清理缓存。
- 按租户做连接配额和优先级队列。
- 监控从“接口耗时”升级为“首字延迟 + 队列等待 + inflight + lag”。
10.4 改造后的收益
在典型高峰窗口,系统观测到:
- P95 首字延迟从 1.8s 降到 420ms。
- P99 最终结果延迟从 4.7s 降到 1.3s。
- GPU 平均利用率从 28% 提升到 63%。
- OOM 事故基本消失。
- 业务热词识别召回率提升约 11%。
更重要的是,系统从“出了问题靠人盯”变成了“可以被指标驱动治理”。
十一、百万并发设计方法:真正要扩的是接入、路由和治理,不只是推理节点
如果目标是百万级在线连接,建议把系统拆成四层容量模型。
11.1 第一层:连接层容量
关注:
- 长连接数
- 心跳包频率
- 网关内存占用
- TLS 握手成本
优化方向:
- 将 WebSocket 接入层独立部署
- 采用轻量连接状态
- 对静音会话降低上传频率或边缘侧预抑制
11.2 第二层:消息层容量
关注:
- Kafka 分区数
- 单分区吞吐
- session key 分布均匀性
- lag 与 rebalance 频率
优化方向:
- 提前规划分区数量
- 按 session_id 做稳定哈希
- 避免热点租户集中命中少量分区
11.3 第三层:推理层容量
关注:
- 每 GPU 可承载活跃流数
- 平均 chunk 推理耗时
- inflight 限制
- 批处理效率
优化方向:
- 按模型规格分池
- 小模型承接低优先级租户
- 热路径、冷路径分离
- 通过压测建立规格基线
11.4 第四层:治理层容量
关注:
- 多租户配额
- 热词配置传播延迟
- 灰度发布风险
- 全链路告警与熔断
优化方向:
- 租户分级
- 金丝雀发布
- 按租户或模型版本做隔离池
- 自动降级策略
所以,所谓“百万并发”的真正答案不是一句“多加几台 GPU”,而是:
把连接、消息、推理、治理拆开,各自独立扩容,各自建立容量模型。
十二、压测方法论:别让错误的压测结论误导架构决策
12.1 需要模拟的不是请求数,而是会话行为
流式语音的压测不能只发 HTTP 请求。你需要模拟:
- 会话建立
- chunk 连续上行
- 不同讲话速率
- 不同静音分布
- 弱网和抖动
- 断线重连
12.2 推荐压测指标
- 单实例最大稳定活跃流数
- 首字延迟 P50/P95/P99
- 最终结果延迟 P95/P99
- 队列等待时间
- GPU 利用率与显存水位
- 异常断开率
- session 泄漏率
12.3 压测场景要覆盖峰谷和异常流量
建议至少做四类压测:
- 稳态压测:验证日常负载是否稳定。
- 突增压测:验证削峰与背压能力。
- 长稳压测:验证内存泄漏和 session 回收。
- 故障压测:验证实例重启、Kafka 抖动、Redis 超时后的恢复能力。
12.4 k6 + WebSocket 思路示例
import ws from "k6/ws";
import { check, sleep } from "k6";
export const options = {
vus: 200,
duration: "5m",
};
export default function () {
const sessionId = `s-${__VU}-${Date.now()}`;
const url = `wss://asr.example.com/ws/asr?tenant_id=test&session_id=${sessionId}`;
const response = ws.connect(url, {}, function (socket) {
socket.on("open", function () {
for (let i = 0; i < 40; i++) {
const fakeChunk = "0011223344556677";
socket.sendBinary(hexToBytes(fakeChunk));
sleep(0.1);
}
socket.send(JSON.stringify({ type: "end" }));
});
socket.on("message", function (_message) {});
socket.on("close", function () {});
});
check(response, { "status is 101": (r) => r && r.status === 101 });
}
function hexToBytes(hex) {
const bytes = new Uint8Array(hex.length / 2);
for (let i = 0; i < hex.length; i += 2) {
bytes[i / 2] = parseInt(hex.substring(i, i + 2), 16);
}
return bytes.buffer;
}
十三、生产落地清单:上线前必须逐项确认
为了方便工程团队直接执行,这里给出一份可操作的上线检查清单。
13.1 模型与推理
- 是否完成模型预热?
- 是否设置最大 inflight 限制?
- 是否区分在线模型与离线修正模型?
- 是否完成热词缓存设计,而非每次动态重载?
13.2 会话与协议
- session_id 是否全链路唯一?
- chunk 是否带 sequence?
- 结果是否有 version,客户端是否按版本覆盖?
- 会话结束后是否有显式 close/finalize 逻辑?
13.3 架构与基础设施
- 是否保证 session 有序性?
- 是否有削峰缓冲层?
- 是否支持按租户限流和配额?
- 是否具备自动扩缩容?
13.4 监控与告警
- 是否监控首字延迟、最终延迟、队列等待、lag、inflight?
- 是否有按租户、按模型版本的维度拆分?
- 是否能追踪单个 session 的完整链路?
13.5 稳定性与容灾
- Pod 重启是否优雅摘流?
- Kafka 积压时是否有降级策略?
- Redis 不可用时系统是否可退化运行?
- OOM 后实例是否能被快速摘除并恢复?
十四、常见误区与反模式
最后再总结几个最常见、也最容易踩坑的反模式。
误区一:把流式识别当普通 HTTP 接口
流式识别是长连接、有状态、顺序敏感的系统,不是简单的请求响应服务。
误区二:用共享缓存承载高频张量状态
Redis 适合存元数据,不适合承载高频 cache 张量。高频推理状态应尽量留在 Worker 本地。
误区三:只追求单实例吞吐最大化
把单实例压到极限,通常意味着尾延迟、抖动和故障恢复能力都会变差。
误区四:只看平均延迟
语音场景真正影响体验的是 P95/P99 和首字延迟,而不是平均值。
误区五:在线链路和离线链路混在一起
实时展示和最终入库的优化目标不同,混在一起只会两边都做不好。
十五、结语:从“能识别”到“能运营”的最后一公里
FunASR 的价值,从来不只是“一个识别模型”,而是它为工业级语音场景提供了一个非常好的起点。真正决定项目成败的,不是你是否把 API 调通,而是你是否把以下几个问题做成系统能力:
- 会话是否有序、状态是否可控
- 延迟是否可拆解、可优化、可观测
- 高峰流量是否可削峰、可背压、可降级
- 多租户是否可隔离、可限流、可治理
- 在线与离线是否分层,准确率与实时性是否各得其所
从单机推理到百万并发,真正的演进路径不是“把模型放大”,而是“把架构做对”。
当你把接入、路由、状态、推理、后处理、观测、调度、治理全部串起来之后,FunASR 才会从一个 Demo,真正成长为一套可以支撑业务规模化增长的生产级实时语音平台。
如果要把全文压缩成一句话,那就是:
生产级 FunASR 架构的本质,不是如何把一次推理跑起来,而是如何让海量实时会话,在可控成本下稳定、有序、低延迟地持续运行。
希望这篇文章的分享能为你的 FunASR 生产化实践提供清晰的路径。欢迎在云栈社区分享你的实践经验和遇到的挑战。