这不是一篇“调用几个 API 拼起来”的 Demo 文,而是一篇面向生产落地的实时语音翻译系统架构实战。我们会从链路拆解、核心原理、微服务分层、Kafka 事件模型、FunASR 流式接入、Java 生产级代码、K8s 弹性部署、容量规划与故障治理几个维度,完整讲清楚一个可支撑高并发、低延迟、可观测、可扩展的实时语音翻译机器人该如何设计。
一、为什么这类系统难,难在哪里?
很多团队第一次做“实时语音翻译”,直觉往往是:
- 用户上传音频
- 调 ASR 拿文本
- 调翻译接口
- 调 TTS 合成播报
Demo 当然能跑,但一旦进入真实业务,问题会迅速暴露:
- 单会话链路长,任何一个环节抖动都会放大总延迟。
- 音频是流式输入,不是天然按句子边界到达,必须解决分段与会话上下文问题。
- ASR、翻译、TTS 的性能模型完全不同,无法用同一种扩容策略处理。
- GPU 资源昂贵,不能用“多加机器”这种粗暴方式解决问题。
- 多租户场景下,术语表、语言对、音色、优先级都不同,系统必须支持租户级隔离与策略编排。
- 实时链路追求 P95/P99 延迟,离线链路追求吞吐与成本,两者优化目标天然冲突。
所以,真正的难点不在“模型能不能识别”,而在以下五件事:
- 如何把实时链路拆成可治理的阶段。
- 如何在高并发下做削峰、背压、隔离与降级。
- 如何保证最终一致性与幂等,避免重复转写、重复翻译、重复播报。
- 如何把 GPU 密集型任务和普通 Java 服务分层调度。
- 如何通过监控、日志、Trace、指标来定位延迟与错误来源。
一句话总结:
生产级实时语音翻译系统,本质上是“流式音频处理 + 事件驱动架构 + AI 推理调度 + 分布式治理”的综合工程问题。
二、先定义目标:什么才叫“生产级”?
在架构设计前,先定义目标,否则系统会一路朝着错误方向演进。
2.1 典型业务场景
这类系统常见于以下场景:
- 国际会议同传:延迟敏感,要求字幕和语音尽量同步。
- 海外直播间翻译:高并发、多语言、需要会话隔离。
- 智能客服语音机器人:需要双向语音、上下文记忆和容错重试。
- 跨国协同办公:强调术语表、行业词典和专有名词控制。
- 长音频批量转写翻译:对实时性要求不高,但对吞吐和成本极其敏感。
2.2 建议的系统指标
以“实时会议翻译机器人”为例,建议给系统设定明确 SLO:
| 指标 |
目标 |
| 首屏字幕延迟 |
P95 < 800ms |
| 语音翻译端到端延迟 |
P95 < 2.5s |
| 单集群在线会话数 |
10 万+ |
| 峰值音频片段处理能力 |
100 万并发片段 |
| 可用性 |
99.95% |
| 重复消息容忍 |
可重放但不重复生效 |
| 数据丢失 |
核心链路 0 可接受丢失 |
注意,“百万并发”在这里更准确地说,是百万级别的并发片段/事件处理能力,而不是百万个 Java 线程。系统依赖的是分段、批处理、事件流、异步编排和资源池化。
三、实时语音翻译链路的本质:不是一条调用链,而是一条事件流
一条完整的语音翻译请求,通常会经历以下处理阶段:
- 音频接入
- 编解码与格式归一化
- VAD 语音活动检测
- 分段切片
- ASR 流式识别
- 文本标准化
- 上下文聚合
- 机器翻译
- 术语修正
- TTS 合成
- 字幕/音频回推
- 会话归档与指标上报
这条链路里有三个核心原理,必须讲清楚。
3.1 原理一:VAD 决定了系统是否“像实时”
流式语音翻译不是等用户说完再统一识别,而是边接收边判断何时形成可识别片段。
VAD 的职责是识别:
- 什么时候语音开始
- 什么时候语音结束
- 当前静音是否已经足够长,值得触发一次识别
如果 VAD 切得太短:
- ASR 上下文不足
- 识别结果不稳定
- Kafka 事件量暴增
- 系统吞吐被元数据放大
如果 VAD 切得太长:
FunASR 内置的 FSMN-VAD 很适合这类场景,因为它支持低时延端点检测,能把语音片段切到“足够短但不碎”的平衡点。
3.2 原理二:ASR 与翻译不是一对一同步关系
很多实现把“拿到一句识别结果”立刻送去翻译,这在简单场景下可以工作,但在真实会话里会产生两个问题:
- ASR 的中间结果经常回滚修正,早翻译会导致字幕闪烁。
- 翻译往往需要前后文,否则术语、指代、时态容易错。
因此,更合理的做法是把 ASR 输出分成两类:
partial:中间稿,用于实时字幕预览,可低成本翻译或仅展示原文。
final:最终稿,进入稳定翻译与 TTS。
这意味着系统需要一层会话聚合器,专门负责把多个 ASR 片段拼成可翻译的最小语义单元。
3.3 原理三:系统瓶颈通常不在 Java,而在 GPU 与外部推理服务
Java 服务主要承担:
- 连接接入
- 事件编排
- 状态管理
- 消息路由
- 重试补偿
- 观测治理
真正昂贵的是:
所以架构重点不是“让 Java 更快”,而是:
- 让昂贵推理尽可能高利用率
- 让便宜的 CPU 服务承担复杂调度逻辑
- 让异步链路吸收峰值
- 让系统在 GPU 紧张时优雅退化
四、总体架构:双通道、事件驱动、状态外置、推理分层
我们把系统拆成四层:
4.1 全局架构图
┌──────────────────────────┐
│ Client SDK │
│ Web / App / Bot / SIP │
└────────────┬─────────────┘
│
WebSocket / HTTP │ gRPC
│
┌───────────────────────▼───────────────────────┐
│ Gateway / Session Access │
│ Spring Cloud Gateway + Netty + Auth + Quota │
└────────────┬───────────────────────┬───────────┘
│ │
│ │
┌────────────────▼────────────┐ ┌──────▼────────────────┐
│ Realtime Orchestrator │ │ Batch Submit Service │
│ Session / VAD / Routing │ │ Long Audio Ingest │
└────────────┬────────────────┘ └──────────┬─────────────┘
│ │
│ audio events │ job events
▼ ▼
┌────────────────────────────────────────────────────┐
│ Kafka Cluster │
│ raw-audio / asr-result / translate / tts / dlq │
└──────┬───────────────────────┬─────────────────────┘
│ │
┌─────────────▼─────────────┐ ┌──────▼────────────────────┐
│ ASR Worker Pool │ │ Translation Worker Pool │
│ FunASR Gateway / GPU │ │ NMT / LLM Router │
└─────────────┬─────────────┘ └──────┬────────────────────┘
│ │
└─────────────┬────────┘
▼
┌──────────────────────┐
│ TTS Worker Pool │
│ GPU / CPU Mixed │
└──────────┬───────────┘
│
▼
┌────────────────────────────────┐
│ Result Push / Playback Service │
│ Subtitle / Audio / Archive │
└────────────────────────────────┘
4.2 为什么要“双通道”
一个生产系统不要把所有流量塞进一条链路。
通道一:低延迟实时链路
适合:
特点:
- WebSocket 长连接
- 内存态会话
- 状态缓存驻留 Redis
- 优先保证延迟
- 允许部分结果先返回
通道二:高吞吐异步链路
适合:
特点:
- Kafka 异步消息
- 任务级重试与补偿
- 关注吞吐、成本和最终一致性
- 支持批处理与离线并发扩展
如果不做双通道,实时请求会被大任务拖垮,离线任务也会被实时高峰挤压。
五、核心组件选型与架构理由
5.1 为什么选择 FunASR
FunASR 适合构建私有化、低延迟语音识别链路,主要原因有:
- 支持流式识别,适合实时字幕与机器人场景。
- VAD、PUNC、ASR 一体化,工程集成成本低。
- 模型部署轻量,便于 GPU 资源池化。
- 对中文场景友好,热词与定制化支持更实用。
它并不是“万能最佳”,但在中文实时识别、私有化部署、成本可控这三个维度上,通常比简单依赖云厂商商业 API 更适合长期演进。
5.2 为什么选择 Kafka 而不是只用 Redis Stream 或 RabbitMQ
实时语音翻译链路本质是持续事件流,Kafka 更适合承担主骨架:
- 高吞吐,适合大量短片段事件。
- 分区模型天然支持水平扩展。
- 消费重放能力强,适合回溯与补偿。
- 配合幂等生产者和事务消费,便于实现“至少一次 + 幂等生效”。
Redis Stream 更适合轻量队列或短链路缓存协同,不适合承接超大规模事件主干。RabbitMQ 在复杂路由上有优势,但在超高吞吐流场景下,Kafka 的扩展性和保留能力更匹配。
5.3 为什么 Java 侧选 Spring Boot 3 + WebFlux
原因很直接:
- 接入层大量长连接,阻塞式模型成本高。
- WebFlux + Netty 更适合高并发 I/O 编排。
- 与 Spring Kafka、Micrometer、Resilience4j、Spring Security 生态结合成熟。
- Java 在企业治理、配置中心、链路追踪、熔断限流、灰度发布上更容易标准化。
注意,这里不是说业务服务全部响应式就一定更好,而是:
接入与编排层适合响应式,重 CPU 或重状态的处理层则可以保持清晰的同步式边界。
六、事件模型设计:消息体决定了后续十年的可维护性
很多系统的架构问题,根源不在代码,而在消息模型过于草率。
6.1 统一事件头设计
建议所有 Kafka 事件都带统一头字段:
{
"eventId": "01JQZ6Y4PZ7F6B8A9R2D1M3X4C",
"traceId": "d5dcb7f7ccf44c8e8c81234f98e20abc",
"sessionId": "room-9288-user-1001",
"tenantId": "acme",
"eventType": "ASR_FINAL",
"sourceLang": "zh-CN",
"targetLang": "en-US",
"sequence": 182,
"timestamp": 1776201058123,
"payload": {}
}
字段说明:
eventId:幂等键,保证重复消费不重复生效。
traceId:全链路 Trace 透传。
sessionId:会话聚合主键。
tenantId:多租户隔离依据。
sequence:有序性控制,防止片段乱序。
eventType:便于统一消费模型。
6.2 推荐的主题划分
建议至少拆成以下主题:
audio.raw.segment
audio.vad.segment
asr.partial.result
asr.final.result
translation.request
translation.result
tts.request
tts.result
session.archive
speech.pipeline.dlq
不要把所有消息塞进一个 Topic 再靠字段区分,后期扩容、回放、权限隔离、数据保留期都会很痛苦。
6.3 分区键该怎么选
建议按 sessionId 或 sessionId + speakerId 做分区键。
原因:
- 同一会话片段要尽量进入同一分区,保证局部有序。
- 不同会话之间天然并行。
- 多说话人场景可用
sessionId + speakerId 提升并发度。
切忌直接按 tenantId 分区,否则一个大租户会变成热点分区。
七、关键工程设计:如何从“能跑”升级到“能扛”
7.1 会话状态外置
实时翻译系统不能把状态只放在单个实例内存里,因为:
- Pod 重启会丢状态
- 无法水平扩容
- 无法做多节点迁移
建议状态分层:
- 短期连接态:在网关实例本地内存
- 会话关键态:Redis
- 长期事件态:Kafka + 对象存储 + OLAP
Redis 建议存:
- 当前会话语言对
- 最后一个确认片段序号
- 最近 N 条 ASR final 文本
- 术语表版本
- TTS 音色配置
- 连接路由信息
7.2 背压治理
高并发场景下,背压必须显式设计,否则某个慢组件会一路拖垮前面所有服务。
典型策略:
- Gateway 限制每会话最大上行码率
- Realtime Orchestrator 对每个 session 设置本地队列上限
- Kafka consumer 使用 pause/resume 控制拉取节奏
- GPU worker 限制并发 in-flight 数量
- 外部翻译接口加入 bulkhead + rate limit
背压不是“发现慢了再加机器”,而是在架构层承认不同阶段处理速度不同。
7.3 幂等与重试
实时语音链路里,重试是一定会发生的:
- Kafka rebalance
- 网络闪断
- 下游超时
- 模型服务返回 5xx
解决原则:
- 消息投递允许至少一次
- 业务生效必须幂等
- 外部副作用必须去重
典型去重点:
- ASR final 结果写入时按
eventId 去重
- 翻译结果写入时按
sessionId + sequence + targetLang 去重
- TTS 音频产物按
textHash + voice + lang 做缓存键
7.4 降级策略
生产系统不能只有“成功”和“失败”两种状态,必须设计灰度降级。
建议降级顺序:
- 关闭润色型 LLM,仅保留 NMT 主翻译
- 关闭同声播报,只保留字幕
- 关闭
partial 翻译,只保留 final 翻译
- 对低优先级租户启用排队
- 切到离线任务模式
这样在高峰或 GPU 故障时,系统仍能“有损可用”。
八、生产级微服务拆分建议
建议拆成以下服务,而不是一个大应用全包:
8.1 speech-gateway
职责:
- 鉴权
- WebSocket 接入
- 心跳
- 配额控制
- 会话注册
- 将音频帧转成内部事件
8.2 session-orchestrator
职责:
- 会话状态机
- VAD 编排
- 片段聚合
- partial/final 切换
- 路由到 ASR / 翻译 / TTS
8.3 asr-worker
职责:
- 调用 FunASR WebSocket/HTTP 推理服务
- 处理 partial/final 文本
- 结构化输出
- 失败重试
8.4 translation-worker
职责:
- NMT 与 LLM 路由
- 术语表注入
- 上下文拼接
- 输出标准化
8.5 tts-worker
职责:
- 选择音色
- 文本切分
- 合成与缓存
- 音频片段存储与分发
8.6 result-dispatcher
职责:
8.7 ops-console
职责:
- 租户配置
- 术语表管理
- 限流与优先级
- 观测面板
- 回放与审计
九、Java 生产级实现:核心代码示例
下面不写“玩具代码”,而是给出更贴近生产结构的实现方式。示例采用 Spring Boot 3、WebFlux、Spring Kafka、Micrometer、Resilience4j。
十、领域模型与事件定义
10.1 统一事件对象
package com.example.speech.domain;
import java.time.Instant;
import java.util.Map;
public record SpeechEvent<T>(
String eventId,
String traceId,
String sessionId,
String tenantId,
String eventType,
String sourceLang,
String targetLang,
long sequence,
Instant occurredAt,
Map<String, String> attributes,
T payload
) {
}
10.2 音频片段事件
package com.example.speech.domain;
public record AudioSegmentPayload(
String codec,
int sampleRate,
int channels,
boolean finalSegment,
byte[] audioBytes,
long startMs,
long endMs
) {
}
10.3 ASR 结果事件
package com.example.speech.domain;
import java.util.List;
public record AsrResultPayload(
String text,
boolean partial,
boolean stable,
List<String> tokens,
double confidence,
long beginMs,
long endMs
) {
}
10.4 翻译结果事件
package com.example.speech.domain;
import java.util.List;
public record TranslationResultPayload(
String sourceText,
String translatedText,
String engine,
boolean llmEnhanced,
List<String> glossaryHits
) {
}
十一、接入层:WebSocket 网关设计
11.1 为什么网关层不能直接做全部业务
很多团队在 Gateway 里一边接收音频,一边直接调 ASR。问题在于:
- 网关会变成状态怪兽
- 无法横向拆分
- 发布风险高
- 高峰期连接数与业务计算耦合
正确方式是:网关只做连接接入与轻量预处理,业务编排交给后端服务。
11.2 WebSocket 音频接入控制器
package com.example.speech.gateway;
import com.example.speech.domain.AudioSegmentPayload;
import com.example.speech.domain.SpeechEvent;
import com.example.speech.service.AudioIngressService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.BinaryMessage;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
@Component
@RequiredArgsConstructor
public class SpeechWebSocketHandler implements WebSocketHandler {
private final AudioIngressService audioIngressService;
@Override
public Mono<Void> handle(WebSocketSession session) {
String sessionId = session.getId();
String tenantId = session.getHandshakeInfo().getHeaders().getFirst("X-Tenant-Id");
String traceId = UUID.randomUUID().toString().replace("-", "");
Flux<WebSocketMessage> outbound = session.receive()
.ofType(BinaryMessage.class)
.concatMap(message -> {
byte[] bytes = new byte[message.getPayload().readableByteCount()];
message.getPayload().read(bytes);
SpeechEvent<AudioSegmentPayload> event = new SpeechEvent<>(
UUID.randomUUID().toString(),
traceId,
sessionId,
tenantId == null ? "default" : tenantId,
"AUDIO_RAW_SEGMENT",
"zh-CN",
"en-US",
System.nanoTime(),
Instant.now(),
Map.of("contentType", "audio/pcm"),
new AudioSegmentPayload("pcm_s16le", 16000, 1, false, bytes, 0, 0)
);
return audioIngressService.accept(event)
.thenReturn(session.textMessage("{\"type\":\"ack\"}"));
})
.onErrorResume(ex -> Mono.just(session.textMessage("{\"type\":\"error\",\"msg\":\"bad audio frame\"}")));
return session.send(outbound);
}
}
这个 Handler 做了三件事:
- 接收二进制音频帧
- 补齐统一事件头
- 丢给下游编排服务
它没有做:
这是正确的分层方式。
十二、入口治理:限流、配额、连接保护
12.1 每会话限速器
package com.example.speech.gateway;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
public class SessionRateLimiter {
private final long maxBytesPerSecond;
private final AtomicLong currentWindowBytes = new AtomicLong();
private volatile long windowStart = System.nanoTime();
public SessionRateLimiter(long maxBytesPerSecond) {
this.maxBytesPerSecond = maxBytesPerSecond;
}
public boolean allow(int bytes) {
long now = System.nanoTime();
if (now - windowStart > Duration.ofSeconds(1).toNanos()) {
windowStart = now;
currentWindowBytes.set(0);
}
return currentWindowBytes.addAndGet(bytes) <= maxBytesPerSecond;
}
}
作用:
- 防止恶意客户端疯狂推音频
- 防止异常客户端破坏下游节奏
- 给会话级背压提供第一道阀门
12.2 推荐的接入保护策略
- 单租户最大连接数
- 单会话最大连续静默时长
- 单帧最大大小校验
- JWT 鉴权
- 租户配额白名单
- 心跳超时剔除
十三、Kafka 生产端:高可靠事件投递
13.1 Kafka 生产者配置
package com.example.speech.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfiguration {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
这几项配置非常关键:
acks=all:保证副本确认后再返回。
enable.idempotence=true:避免重复投递。
max.in.flight=1:降低重试打乱顺序的风险。
compression=lz4:在高频小消息场景下更节省带宽。
13.2 音频事件写入服务
package com.example.speech.service;
import com.example.speech.domain.AudioSegmentPayload;
import com.example.speech.domain.SpeechEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
public class AudioIngressService {
private static final String TOPIC = "audio.raw.segment";
private final KafkaTemplate<String, Object> kafkaTemplate;
public Mono<Void> accept(SpeechEvent<AudioSegmentPayload> event) {
return Mono.fromFuture(kafkaTemplate.send(TOPIC, event.sessionId(), event).completable())
.then();
}
}
十四、会话编排层:系统真正的中枢
如果只记住本文一个结论,请记住这句:
实时语音翻译系统的关键不在单个模型,而在会话编排层。
编排层负责:
- 管控 session 生命周期
- 管控 partial/final 的衔接
- 维持文本上下文窗口
- 控制翻译触发时机
- 控制 TTS 是否播报、何时播报
14.1 会话状态模型
package com.example.speech.session;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Deque;
public class SessionState {
private final String sessionId;
private final String sourceLang;
private final String targetLang;
private long lastConfirmedSequence;
private Instant lastActiveAt;
private final Deque<String> finalTextWindow = new ArrayDeque<>();
public SessionState(String sessionId, String sourceLang, String targetLang) {
this.sessionId = sessionId;
this.sourceLang = sourceLang;
this.targetLang = targetLang;
this.lastActiveAt = Instant.now();
}
public void confirmSequence(long sequence) {
this.lastConfirmedSequence = sequence;
this.lastActiveAt = Instant.now();
}
public void appendFinalText(String text) {
if (finalTextWindow.size() >= 6) {
finalTextWindow.removeFirst();
}
finalTextWindow.addLast(text);
this.lastActiveAt = Instant.now();
}
public String contextText() {
return String.join(" ", finalTextWindow);
}
}
14.2 会话状态仓库接口
package com.example.speech.session;
import reactor.core.publisher.Mono;
public interface SessionStateRepository {
Mono<SessionState> get(String sessionId);
Mono<Void> save(SessionState state);
Mono<Void> delete(String sessionId);
}
生产环境建议使用 Redis Hash 或 Redis JSON 存储,并设置 TTL。
十五、FunASR 集成:Java 如何稳妥调用流式 ASR
实际落地中,Java 通常不会直接加载 FunASR 模型,而是接入官方或定制化的 FunASR 推理服务。这样做有三个好处:
- 模型服务和业务服务解耦
- GPU 调度独立
- Java 业务服务无需承担推理环境管理
15.1 FunASR WebSocket 客户端
package com.example.speech.asr;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;
import java.nio.charset.StandardCharsets;
@Component
@RequiredArgsConstructor
public class FunAsrClient {
private final ObjectMapper objectMapper;
@Value("${funasr.ws-url}")
private String wsUrl;
public Flux<FunAsrResponse> recognize(Flux<byte[]> audioStream) {
return HttpClient.create()
.websocket(WebsocketClientSpec.builder().handlePing(true).build())
.uri(wsUrl)
.handle((inbound, outbound) -> {
String initJson = """
{"mode":"2pass","wav_name":"stream","chunk_size":[5,10,5],"chunk_interval":10}
""";
Mono<Void> sendInit = outbound.sendString(Mono.just(initJson)).then();
Mono<Void> sendAudio = outbound.sendByteArray(audioStream).then();
Flux<FunAsrResponse> responses = inbound.receive()
.asString(StandardCharsets.UTF_8)
.map(this::parse);
return sendInit.then(sendAudio).thenMany(responses);
});
}
private FunAsrResponse parse(String json) {
try {
JsonNode node = objectMapper.readTree(json);
return new FunAsrResponse(
node.path("text").asText(""),
node.path("is_final").asBoolean(false),
node.path("mode").asText("online")
);
} catch (Exception ex) {
throw new IllegalStateException("Failed to parse FunASR response: " + json, ex);
}
}
}
对应响应对象:
package com.example.speech.asr;
public record FunAsrResponse(
String text,
boolean isFinal,
String mode
) {
}
15.2 ASR Worker 的消费与回写
package com.example.speech.asr;
import com.example.speech.domain.AsrResultPayload;
import com.example.speech.domain.AudioSegmentPayload;
import com.example.speech.domain.SpeechEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Service
@RequiredArgsConstructor
public class AsrWorker {
private final FunAsrClient funAsrClient;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "audio.raw.segment", groupId = "asr-worker-group", concurrency = "6")
public void onAudioSegment(SpeechEvent<AudioSegmentPayload> event) {
Flux<byte[]> stream = Flux.just(event.payload().audioBytes());
funAsrClient.recognize(stream)
.map(result -> new SpeechEvent<>(
UUID.randomUUID().toString(),
event.traceId(),
event.sessionId(),
event.tenantId(),
result.isFinal() ? "ASR_FINAL" : "ASR_PARTIAL",
event.sourceLang(),
event.targetLang(),
event.sequence(),
Instant.now(),
Map.of("engine", "funasr"),
new AsrResultPayload(
result.text(),
!result.isFinal(),
result.isFinal(),
List.of(),
0.98d,
event.payload().startMs(),
event.payload().endMs()
)
))
.doOnNext(output -> kafkaTemplate.send(
resultTopic(output),
output.sessionId(),
output
))
.blockLast();
}
private String resultTopic(SpeechEvent<AsrResultPayload> output) {
return output.payload().partial() ? "asr.partial.result" : "asr.final.result";
}
}
生产建议:
@KafkaListener 使用手动 ack 模式控制位点提交。
- 对 FunASR 调用加超时、熔断、重试。
- partial 结果可以降低保留时间。
- final 结果必须持久化并可回放。
十六、翻译层:不要把翻译服务做成“纯字符串替换器”
真正可用的翻译层,至少要解决四件事:
- 语言识别与路由
- 术语表命中
- 上下文记忆
- 多引擎策略切换
16.1 翻译网关接口
package com.example.speech.translation;
public interface TranslationGateway {
TranslationResult translate(TranslationCommand command);
}
package com.example.speech.translation;
import java.util.List;
public record TranslationCommand(
String tenantId,
String sessionId,
String sourceLang,
String targetLang,
String text,
String context,
List<String> glossaryTerms,
boolean lowLatencyMode
) {
}
package com.example.speech.translation;
import java.util.List;
public record TranslationResult(
String translatedText,
String engine,
boolean llmEnhanced,
List<String> glossaryHits
) {
}
16.2 多引擎路由策略
package com.example.speech.translation;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class SmartTranslationGateway implements TranslationGateway {
private final NmtGateway nmtGateway;
private final LlmGateway llmGateway;
@Override
public TranslationResult translate(TranslationCommand command) {
if (command.lowLatencyMode() || command.text().length() < 40) {
return nmtGateway.translate(command);
}
TranslationResult base = nmtGateway.translate(command);
TranslationCommand enhanced = new TranslationCommand(
command.tenantId(),
command.sessionId(),
command.sourceLang(),
command.targetLang(),
base.translatedText(),
command.context(),
command.glossaryTerms(),
false
);
return llmGateway.translate(enhanced);
}
}
这个设计体现了一个关键思想:
NMT 保证主链路低延迟,LLM 只在需要时做增强。
16.3 翻译 Worker
package com.example.speech.translation;
import com.example.speech.domain.AsrResultPayload;
import com.example.speech.domain.SpeechEvent;
import com.example.speech.domain.TranslationResultPayload;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Service
@RequiredArgsConstructor
public class TranslationWorker {
private final SmartTranslationGateway translationGateway;
private final SessionContextService sessionContextService;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "asr.final.result", groupId = "translation-worker-group", concurrency = "8")
public void onFinalAsr(SpeechEvent<AsrResultPayload> event) {
String context = sessionContextService.contextText(event.sessionId());
List<String> glossary = sessionContextService.glossary(event.tenantId());
TranslationResult result = translationGateway.translate(new TranslationCommand(
event.tenantId(),
event.sessionId(),
event.sourceLang(),
event.targetLang(),
event.payload().text(),
context,
glossary,
false
));
SpeechEvent<TranslationResultPayload> output = new SpeechEvent<>(
UUID.randomUUID().toString(),
event.traceId(),
event.sessionId(),
event.tenantId(),
"TRANSLATION_RESULT",
event.sourceLang(),
event.targetLang(),
event.sequence(),
Instant.now(),
Map.of("engine", result.engine()),
new TranslationResultPayload(
event.payload().text(),
result.translatedText(),
result.engine(),
result.llmEnhanced(),
result.glossaryHits()
)
);
sessionContextService.appendFinalText(event.sessionId(), event.payload().text());
kafkaTemplate.send("translation.result", output.sessionId(), output);
}
}
十七、TTS 层:高并发下的关键不是“能合成”,而是“能复用”
TTS 最容易被忽略的问题是重复计算。
在真实场景中,很多句子会反复出现:
- 欢迎词
- 固定会议术语
- 常见客服回复
- UI 操作提示
如果每次都重新合成,GPU 成本会被白白放大。
17.1 TTS 请求模型
package com.example.speech.tts;
public record TtsCommand(
String sessionId,
String tenantId,
String lang,
String voice,
String text,
boolean streamMode
) {
}
17.2 带缓存的 TTS 服务
package com.example.speech.tts;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.HexFormat;
@Service
@RequiredArgsConstructor
public class TtsCachingService {
private final TtsGateway ttsGateway;
private final StringRedisTemplate redisTemplate;
private final AudioObjectStorage audioObjectStorage;
public String synthesize(TtsCommand command) {
String key = "tts:" + hash(command.lang() + "|" + command.voice() + "|" + command.text());
String cached = redisTemplate.opsForValue().get(key);
if (cached != null) {
return cached;
}
byte[] audio = ttsGateway.synthesize(command);
String objectKey = audioObjectStorage.put(audio, "audio/mpeg");
redisTemplate.opsForValue().set(key, objectKey);
return objectKey;
}
private String hash(String text) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return HexFormat.of().formatHex(digest.digest(text.getBytes(StandardCharsets.UTF_8)));
} catch (Exception ex) {
throw new IllegalStateException(ex);
}
}
}
这段代码的价值不在“缓存语法”,而在两个工程点:
- 对副作用结果做内容寻址
- 把 GPU 重活转成可复用静态资产
十八、失败处理:DLQ、补偿、人工回放
一条成熟链路必须具备失败可恢复能力。
18.1 死信队列设计
下列事件建议进入 speech.pipeline.dlq:
- 连续重试后仍失败的 ASR 片段
- 翻译引擎超时且超过最大重试次数
- TTS 返回非法音频
- 事件反序列化失败
- 会话状态缺失导致无法继续处理
DLQ 消息建议追加这些字段:
failedStage
errorCode
errorMessage
retryCount
originalTopic
originalPartition
originalOffset
18.2 补偿原则
- 只补偿 final 结果,不补偿 partial。
- 重放必须幂等。
- 人工回放应支持按
traceId 或 sessionId 查询。
- 会话已结束时,补偿结果只做归档,不再回推前端。
十九、可观测性:没有观测,所谓实时系统就是黑盒
生产系统必须至少做到“三件套”:
19.1 必须监控的核心指标
接入层指标
- 当前在线连接数
- 每秒接收音频帧数
- 单租户活跃会话数
- 心跳超时断开数
- 鉴权失败数
Kafka 指标
- Topic 吞吐
- Consumer Lag
- Rebalance 次数
- 失败重试次数
- DLQ 速率
模型服务指标
- ASR RT
- 翻译 RT
- TTS RT
- GPU 利用率
- GPU 显存占用
- 推理队列长度
用户体验指标
- 首字幕延迟
- 端到端翻译延迟
- 音频首包延迟
- partial 到 final 回滚率
- 句段丢失率
19.2 Micrometer 指标示例
package com.example.speech.metrics;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;
@Component
public class SpeechMetrics {
private final Counter asrFailures;
private final Timer translationLatency;
public SpeechMetrics(MeterRegistry registry) {
this.asrFailures = Counter.builder("speech_asr_failures_total")
.description("Total ASR failures")
.register(registry);
this.translationLatency = Timer.builder("speech_translation_latency")
.publishPercentileHistogram()
.register(registry);
}
public void markAsrFailure() {
asrFailures.increment();
}
public Timer.Sample startTranslationTimer() {
return Timer.start();
}
public void stopTranslationTimer(Timer.Sample sample) {
sample.stop(translationLatency);
}
}
19.3 Trace 设计建议
建议以 traceId 打通:
- Gateway
- Kafka producer
- Kafka consumer
- ASR worker
- Translation worker
- TTS worker
- Result dispatcher
这样你才能在排障时看清楚:
二十、高并发设计:百万级并发如何拆解
“百万并发”不是靠一台超大机器,而是靠分层拆解。
20.1 容量拆解思路
假设:
- 单会话每秒 10 个音频帧
- 峰值在线会话 10 万
- 则入口每秒音频事件数约 100 万
这时候要拆成三个问题:
- 网关能否扛住 10 万长连接
- Kafka 能否扛住每秒 100 万片段事件
- GPU worker 能否在合适窗口内消化可识别片段
注意,并不是每个音频帧都触发 ASR final。经过 VAD 和聚合后,真正进入翻译和 TTS 的事件量会显著下降。
20.2 分层容量公式
网关容量
网关实例数估算:
网关实例数 = 在线连接数 / 单实例稳定连接承载能力 * 冗余系数
例如:
- 单实例稳定支撑 2 万连接
- 目标在线连接数 10 万
- 冗余系数 1.5
则至少需要:
10万 / 2万 * 1.5 = 7.5
即至少 8 个 Gateway Pod。
Kafka 分区数
分区数 >= 峰值消费者实例数 * 1.5
如果你预计 ASR Worker 峰值会扩到 32 个实例,那么建议分区数至少 48。
GPU Worker 数量
GPU 实例数 = 峰值识别请求数 / 单实例稳定 QPS * 冗余系数
如果单个 FunASR GPU Pod 稳定 QPS 为 120,峰值有效识别请求为 3000 QPS,冗余系数取 1.3:
3000 / 120 * 1.3 ≈ 32.5
则建议配 33 个 ASR GPU 实例。
二十一、Kubernetes 生产部署设计
K8s 不是“把容器跑起来”这么简单,而是资源分层调度平台。
21.1 节点池拆分建议
建议至少拆成三类节点池:
gateway-cpu-pool
worker-cpu-pool
gpu-inference-pool
原因:
- Gateway 更关注网络连接能力
- 编排服务更关注 CPU、内存、GC
- 推理服务更关注 GPU 显存与 NUMA 拓扑
21.2 Gateway Deployment 示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: speech-gateway
spec:
replicas: 8
selector:
matchLabels:
app: speech-gateway
template:
metadata:
labels:
app: speech-gateway
spec:
containers:
- name: speech-gateway
image: registry.company.com/ai/speech-gateway:1.0.0
ports:
- containerPort: 8080
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
env:
- name: JAVA_TOOL_OPTIONS
value: "-XX:+UseG1GC -XX:MaxRAMPercentage=75"
readinessProbe:
httpGet:
path: /actuator/health/readiness
port: 8080
livenessProbe:
httpGet:
path: /actuator/health/liveness
port: 8080
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: speech-gateway
topologyKey: kubernetes.io/hostname
21.3 GPU Worker Deployment 示例
apiVersion: apps/v1
kind: Deployment
metadata:
name: asr-worker
spec:
replicas: 12
selector:
matchLabels:
app: asr-worker
template:
metadata:
labels:
app: asr-worker
spec:
nodeSelector:
node-pool: gpu-inference-pool
containers:
- name: asr-worker
image: registry.company.com/ai/asr-worker:1.0.0
resources:
requests:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "1"
limits:
cpu: "8"
memory: "16Gi"
nvidia.com/gpu: "1"
21.4 HPA 不能只看 CPU
AI 推理系统如果只按 CPU 扩容,通常会做错。
推荐 HPA/KEDA 指标:
- Kafka consumer lag
- 自定义推理队列长度
- GPU utilization
- 在线连接数
- P95 延迟
例如:
- Gateway 根据连接数与流量扩缩
- ASR Worker 根据
audio.raw.segment lag 扩缩
- Translation Worker 根据
asr.final.result lag 扩缩
二十二、Resilience4j:超时、隔离、熔断必须是默认配置
调用模型服务或第三方翻译接口时,必须加治理层。
22.1 典型治理策略
- Timeout:避免单次调用拖死线程池
- Retry:仅对幂等调用重试
- CircuitBreaker:故障期快速失败
- Bulkhead:限制并发,避免资源打满
- RateLimiter:保护外部接口额度
22.2 示例代码
package com.example.speech.translation;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import io.github.resilience4j.timelimiter.annotation.TimeLimiter;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Component
public class LlmGateway {
@TimeLimiter(name = "llmTranslate")
@Retry(name = "llmTranslate")
@CircuitBreaker(name = "llmTranslate", fallbackMethod = "fallback")
@Bulkhead(name = "llmTranslate")
public TranslationResult translate(TranslationCommand command) {
return new TranslationResult(
"[LLM Enhanced] " + command.text(),
"llm",
true,
List.of()
);
}
public TranslationResult fallback(TranslationCommand command, Throwable throwable) {
return new TranslationResult(command.text(), "fallback", false, List.of());
}
}
二十三、数据存储设计:冷热分层与归档
实时系统不能把所有数据都塞数据库。
建议这样分:
23.1 热数据
- Redis:会话态、最近上下文、热点缓存
- Kafka:事件流与短期回放
23.2 温数据
- Elasticsearch / OpenSearch:日志、检索、字幕回放查询
- ClickHouse:运营分析、性能分析、租户计费
23.3 冷数据
- 对象存储:原始音频、TTS 产物、归档字幕、回放素材
23.4 归档建议
归档对象建议包含:
- 原始音频 URL
- 分段时间轴
- ASR final 文本
- 翻译文本
- TTS 音频 URL
- Speaker 分离结果
- 会话级统计指标
这样既能支持回放,也能支持后续训练数据沉淀。
二十四、真实业务案例:跨境直播翻译机器人
下面给一个更接近实战的案例。
24.1 场景描述
某跨境直播平台需要给中文主播提供英文、西班牙文实时翻译能力,要求:
- 字幕延迟 < 1s
- 语音播报延迟 < 3s
- 高峰同时在线直播间 5000+
- 每个直播间平均 2 名说话人
- 高峰晚间流量是白天 6 倍
24.2 架构策略
- 主播上行音频走 WebSocket。
- Gateway 仅接入与会话鉴权。
session-orchestrator 聚合主播语音片段。
- FunASR 产出 partial 与 final 文本。
partial 只回字幕,不走 TTS。
final 进入翻译。
- 短句优先走 NMT,长句或带营销术语的句子走 NMT + LLM 二次润色。
- 常见直播话术的 TTS 结果缓存到对象存储。
- 高峰期关闭低优先级直播间的语音播报,仅保留字幕。
24.3 为什么这个设计有效
- 字幕链路和播报链路拆开,避免 TTS 拖慢主体验。
- 术语表按直播品类下发,减少“SKU、满减、现货、预售”等术语误译。
- 通过租户优先级和直播间等级做流量调度,确保头部房间优先。
- 可把晚上高峰的翻译增强策略切成基础版,显著降低 GPU 压力。
二十五、常见架构误区
25.1 误区一:所有环节都追求实时
错误。
应该区分:
- 用户可感知实时:字幕、关键播报
- 系统内部异步:归档、统计、回放、训练样本沉淀
25.2 误区二:ASR 一出结果就立即翻译
错误。
应该按 partial/final 区分策略,并让会话上下文参与翻译决策。
25.3 误区三:分区不够后面再加
部分正确,但代价很高。
分区数设计不足会限制消费者并发上限,后期再扩需要迁移和重平衡成本,应该尽量在早期按峰值预留。
25.4 误区四:只用 HTTP 同步串行调用就够了
对于小流量 Demo 可以,对生产不行。
原因:
- 无法削峰
- 无法重放
- 难以做阶段治理
- 一旦下游慢,整体雪崩
25.5 误区五:可用性只靠多副本
错误。
真正的可用性来自:
多副本只是其中一小部分。
二十六、上线前检查清单
建议上线前逐项核对:
架构层
- 是否实现了双通道分流
- 是否定义了统一事件头
- 是否有 DLQ 与回放机制
- 是否有租户级限流与优先级
性能层
- 是否做过 10 倍峰值压测
- 是否验证了 Kafka lag 告警
- 是否验证了 GPU 饱和下的降级逻辑
- 是否测过 P95/P99 端到端延迟
可靠性层
- 是否验证了 Kafka rebalance 期间不丢消息
- 是否验证了 ASR worker 重启后的幂等
- 是否验证了 Redis 故障时的兜底逻辑
- 是否验证了对象存储不可用时的降级策略
运维层
- 是否有全链路 Trace
- 是否有核心业务指标看板
- 是否有租户级成本统计
- 是否有灰度开关与配置中心
二十七、演进路线:从 0 到 1,再从 1 到 100
如果你的系统还在早期,不必一上来就上最复杂架构。可以按三个阶段演进。
阶段一:MVP 可用版
- Gateway
- FunASR
- 基础翻译接口
- TTS
- Redis 会话态
- Kafka 主链路
目标:
阶段二:生产稳定版
- partial/final 分流
- DLQ + 补偿
- 多租户配额
- 指标、日志、Trace
- HPA/KEDA
- TTS 缓存
目标:
阶段三:平台化版
- 多模型路由
- 术语平台
- 会话画像
- 质量评估闭环
- 成本优化引擎
- 训练数据反哺
目标:
二十八、总结:一套真正能上生产的实时语音翻译系统,核心不是模型,而是系统工程
回到最初的问题:为什么很多语音翻译 Demo 能跑,但产品一上线就不稳?
因为 Demo 解决的是“功能存在”,而生产解决的是:
- 延迟能否稳定
- 高峰能否扛住
- 失败能否恢复
- 成本能否可控
- 架构能否继续扩展
一套成熟的实时语音翻译机器人,至少应该具备以下能力:
- 用 FunASR 解决低延迟流式识别。
- 用 Kafka 构建可扩展、可重放的事件主干。
- 用 Java 微服务承担接入、编排、治理和状态管理。
- 用 K8s 把 CPU 服务和 GPU 推理服务分层调度。
- 用 Redis、对象存储、OLAP 做冷热分层。
- 用幂等、DLQ、重试、降级和观测体系保障生产稳定。
最终你会发现:
实时语音翻译不是“一个模型服务”,而是一个典型的 AI 原生分布式系统。
模型决定上限,架构决定下限,工程化决定你能不能活着跑到明天。
写在最后
构建这样一个系统涉及广泛的技术栈和深入的架构思考,从流式音频处理到分布式消息中间件,再到微服务治理和云原生部署。如果你对其中提到的 Spring Boot 微服务实践、Kafka 的高并发应用,或是 K8s 的生产级调度有更多兴趣,欢迎到云栈社区 与更多开发者交流探讨。在那里,你可以找到更多关于高并发架构、AI 工程化和云原生技术的深度内容与实战经验分享。