小伙伴们,你们有没有遇到过这样的新同事:入职第一天,不办入职手续、不领电脑、不申请各种权限,自己带着笔记本往工位一坐,打开 IDEA,手指在键盘上飞起,噼里啪啦一顿操作,俩小时后甩给你一个方案,直接把你引以为傲的“祖传代码”按在地上摩擦?
想象一下,你们团队上周就空降了这么一位——前谷歌工程师,据说在硅谷搞过广告实时竞价系统。结果人家到岗第一天下午,就撞上了产品经理的夺命 call。
一、崩溃现场:消息推送又卡死,产品经理要砸电脑
产品经理小张冲进你们工位,脸涨得通红:“用户消息推送又卡死了!后台积压了五十万条推送,再不发出去,用户都以为我们 app 死了!”
你们几个老油条立刻围成一团,开始经典救火三板斧:
- “要不加个定时任务,每分钟扫一次积压消息?”
- “扩容服务器吧,搞几台高性能的,堆硬件!”
- “肯定是数据库连接池太小了,改大点,再大点!”
正讨论得热火朝天,角落里默默敲代码的新同事突然抬起头,摘下耳机,淡定地问:“你们用的是单线程轮询吧?”
你们一愣:“是啊,怎么了?”
他笑了笑:“那难怪会卡死。让我试试?”
然后他转身回工位,戴上耳机,双手往键盘上一搭——接下来两个小时,你们见证了什么叫“降维打击”。
两小时后,他把一个叫“多线程永动推送引擎”的代码扔到群里。结果你猜怎么着?消息推送速度直接飙升 10 倍,积压的五十万条消息半小时就发完了,内存稳得一批,CPU 利用率还不到 30%。
你们凑过去看他代码时的表情,就像山顶洞人第一次看见打火机:每个符号都认识,连在一起就看不懂了!后来他花了半小时给大家捋了一遍,大家这才恍然大悟:原来 多线程 还能这么玩?
二、先整明白:啥叫“永动任务”?和永动机有啥区别?
在说他的方案前,咱们要先搞清楚一个问题:啥叫“永动任务”?难道真能像永动机一样不用管,自己跑一辈子?
其实这玩意儿本质就是“能一直跑、不中断、还能自动处理任务”的多线程程序。就拿我们遇到的消息推送来说,传统玩法是这样的:
- 写个单线程程序,循环查数据库,查出一条推送一条。(像一个人搬砖,累死累活)
- 觉得慢?那就开个线程池,每次批量查 100 条扔给线程池推送。(像找了几个搬砖工,但大家抢着搬)
- 怕程序崩了?加个 try-catch,崩了就手动重启。(像砖倒了,再叫人扶起来)
但这玩法坑太多了,例如:
- 任务断档:线程池里的任务跑完了,得等下一次定时任务触发才会继续查数据,中间空窗期可能好几分钟。
- 线程浪费:如果消息少的时候,线程池里的线程全闲着,占着内存还没活儿干。
- 崩了没人管:万一程序因为内存溢出崩了,得等运维发现了才能重启,中间丢的数据都找不回来。
- 不好扩展:想加个“消息去重”功能,得改核心代码,改完还得重新部署,一不小心就引入新 BUG。
那他搞的“永动推送引擎”是咋解决这些问题的?简单说就是三个特点:
- 不停机:任务处理完一批,自动捞下一批,中间不空档,跟永动机似的。
- 自修复:某个线程崩了,自动重启,不影响整体。
- 好扩展:想加新功能,直接加“处理器”,不用改核心逻辑,跟乐高积木一样。
三、拆解方案:300行代码搞定永动推送核心
当时他扔出来的代码,核心也就 300 来行,咱分步骤拆开看。先说明下,他用的是 Java 21 的新特性虚拟线程(Virtual Threads),但思路换传统线程池也能用,咱重点看逻辑。
1. 先搭个“任务传送带”:让任务像流水线一样跑起来
首先得有个“传送带”,能装任务,还能让任务循环跑。他用了 生产者-消费者模式,简单说就是:一个线程负责捞消息(生产者),多个线程负责推送消息(消费者),中间用个队列存任务,像工厂流水线一样。
咱先看核心类的结构:
/**
* 永动推送引擎 - 让消息推送像吃了炫迈一样停不下来
*/
public class EternalPushEngine {
// 任务队列:存待推送的消息(生产者放,消费者拿)
private final BlockingQueue<PushTask> taskQueue;
// 消费者执行器:处理消息的线程(用虚拟线程,轻量级,想开多少开多少)
private final ExecutorService consumerExecutor;
// 生产者执行器:捞消息的线程
private final ExecutorService producerExecutor;
// 开关:控制任务启停
private volatile boolean running = false;
// 监控线程:用于自愈和统计
private Thread monitorThread;
/**
* 初始化引擎
* @param consumerNum 消费者线程数(工人数量)
* @param queueCapacity 队列容量(原材料仓库大小)
*/
public EternalPushEngine(int consumerNum, int queueCapacity) {
// 用LinkedBlockingQueue,满了会阻塞生产者,避免内存爆了(就像仓库满了,采购员就先歇着)
this.taskQueue = new LinkedBlockingQueue<>(queueCapacity);
// 消费者用虚拟线程池,可以开很多,不耗资源(像请了一群兼职,随叫随到)
this.consumerExecutor = Executors.newVirtualThreadPerTaskExecutor();
// 生产者用单线程虚拟线程(只需要一个采购员)
this.producerExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
/**
* 启动引擎
*/
public synchronized void start() {
if (running) {
log("引擎已经在跑了,别急~");
return;
}
running = true;
// 启动生产者
producerExecutor.submit(this::produceLoop);
// 启动消费者
for (int i = 0; i < consumerNum; i++) {
consumerExecutor.submit(this::consumeLoop);
}
// 启动监控线程(用于自愈和健康检查)
monitorThread = new Thread(this::monitorLoop, "Monitor-Thread");
monitorThread.start();
log("永动推送引擎启动成功,比你家楼下早餐店还敬业!");
}
/**
* 停止引擎(优雅关闭)
*/
public synchronized void stop() {
running = false;
// 中断监控线程
if (monitorThread != null) {
monitorThread.interrupt();
}
// 关闭线程池
producerExecutor.shutdown();
consumerExecutor.shutdown();
try {
// 等30秒
if (!producerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
producerExecutor.shutdownNow();
}
if (!consumerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
consumerExecutor.shutdownNow();
}
} catch (InterruptedException e) {
producerExecutor.shutdownNow();
consumerExecutor.shutdownNow();
}
log("引擎已停止,所有任务都处理完了,放心~");
}
/**
* 生产者循环:捞消息
*/
private void produceLoop() {
while (running) {
try {
// 1. 从数据库捞待推送的消息(每次捞200条)
List<PushMessage> messages = fetchPendingMessages(200);
if (messages.isEmpty()) {
// 没消息?歇2秒再捞,别把数据库查崩了(像钓鱼,没鱼就歇会儿)
Thread.sleep(2000);
continue;
}
// 2. 包装成任务,丢进队列
for (PushMessage msg : messages) {
PushTask task = new PushTask(msg);
// 队列满了会阻塞,不会OOM(像超市收银台排满了,后面的顾客就等着)
taskQueue.put(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logErr("捞消息翻车了:" + e.getMessage());
try {
// 出错歇5秒再试
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
/**
* 消费者循环:推送消息
*/
private void consumeLoop() {
while (running) {
try {
// 从队列拿任务,没任务会阻塞(不用轮询,省CPU)
PushTask task = taskQueue.take();
// 处理任务(这里可以加超时控制,后面讲)
processTask(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logErr("推送任务翻车了:" + e.getMessage());
}
}
}
/**
* 处理单个任务
*/
private void processTask(PushTask task) {
// 这里会调用注册的处理器链(后面讲)
for (PushTaskProcessor processor : processors) {
if (task.isProcessed()) break;
processor.process(task);
}
}
// 模拟从数据库捞消息
private List<PushMessage> fetchPendingMessages(int limit) {
// 实际查数据库,返回待推送消息
return MessageDAO.fetchPending(limit);
}
// 日志辅助
private void log(String msg) { System.out.println("[" + Thread.currentThread().getName() + "] " + msg); }
private void logErr(String msg) { System.err.println("[" + Thread.currentThread().getName() + "] ERROR: " + msg); }
}
这段代码看着长,其实核心就三件事:
- 生产者:不停捞消息,丢进队列,没消息就歇2秒,避免空转。
- 消费者:从队列拿任务处理,没任务就阻塞,不用一直轮询,省 CPU。
- 开关控制:start/stop 方法,优雅启停,避免丢任务。
这里有个细节:他用的是 Java 21 的虚拟线程(Virtual Threads),消费者线程数可以开很大(比如几百个)而不耗太多资源。传统线程池线程数受限于系统资源,虚拟线程可以轻松开上千个,非常适合 IO 密集型任务。
2. 加个“自动修复功能”:线程挂了自动复活,比小强还顽强
光有循环跑还不够,万一某个消费者线程处理任务时抛了个异常崩了咋办?比如推送接口超时,或者消息格式异常导致 NPE,线程直接挂了,那少个线程处理效率不就降了?
他用虚拟线程天然不怕挂,因为虚拟线程是 JVM 管理的,挂了一个不影响其他。但为了更健壮,他加了个监控线程,定期检查消费者线程的健康状况。
/**
* 监控循环:定期检查消费者健康,自动修复
*/
private void monitorLoop() {
// 记录上一次检查时的消费者数量
int lastConsumerCount = 0;
while (running) {
try {
// 每10秒检查一次
Thread.sleep(10000);
// 1. 检查队列堆积情况
int queueSize = taskQueue.size();
if (queueSize > taskQueue.remainingCapacity() * 0.8) {
log("警告:队列堆积严重,当前大小:" + queueSize);
// 可以动态增加消费者,这里简化,只是警告
}
// 2. 检查消费者数量(虚拟线程池没有直接API获取线程数,但我们可以通过记录提交的任务数来估算)
// 这里简单处理:如果队列不为空但很长时间没变化,可能消费者都死了,就重启一批
// 实际可以用更复杂的机制,比如每个任务记录时间戳
// 3. 检查生产者是否还活着
// 同样简化处理
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logErr("监控线程出错了:" + e.getMessage());
}
}
}
但这还不够,如果某个消费者线程因为代码 bug 导致永久阻塞(比如死锁),监控线程也救不了。他用了更狠的一招:给每个任务设置超时时间,超过指定时间就放弃,防止任务卡死线程。
// 在消费者循环中,用CompletableFuture加超时
private void consumeLoop() {
while (running) {
try {
PushTask task = taskQueue.take();
// 用CompletableFuture执行任务,并设置超时
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> processTask(task), consumerExecutor);
// 等待最多5秒
future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
logErr("任务处理超时,放弃:" + e.getMessage());
// 可以记录失败,稍后重试
} catch (Exception e) {
logErr("消费者循环异常:" + e.getMessage());
}
}
}
这样,即使某个任务卡住了,5 秒后也会超时放弃,线程可以继续处理下一个任务。
3. 搞“可插拔处理器”:加功能像换手机壳,不用动内核
需求天天变。今天要加消息去重,明天要加推送统计。如果每次改需求都要动核心代码,早晚得改出 BUG。
他在设计时用了 责任链模式,把每个处理步骤抽象成独立的处理器,可以动态组合。
先定义处理器接口:
/**
* 推送任务处理器接口 - 所有处理逻辑都实现这个接口(像手机壳,想换就换)
*/
public interface PushTaskProcessor {
/**
* 处理任务
* @param task 推送任务
*/
void process(PushTask task);
/**
* 处理器优先级(数字越小越先执行)
*/
default int getOrder() {
return 0;
}
}
然后在引擎中维护一个处理器列表,并添加注册方法:
// 处理器列表
private final List<PushTaskProcessor> processors = new CopyOnWriteArrayList<>();
/**
* 注册处理器
*/
public void registerProcessor(PushTaskProcessor processor) {
processors.add(processor);
// 按优先级排序
processors.sort(Comparator.comparingInt(PushTaskProcessor::getOrder));
log("注册处理器:" + processor.getClass().getSimpleName());
}
修改 processTask 方法,遍历调用处理器:
private void processTask(PushTask task) {
for (PushTaskProcessor processor : processors) {
if (task.isProcessed()) {
break; // 如果任务已被标记为完成(比如去重时过滤掉),就停止后续处理
}
try {
processor.process(task);
} catch (Exception e) {
logErr("处理器 " + processor.getClass().getSimpleName() + " 执行失败:" + e.getMessage());
// 根据策略决定是否继续,这里选择继续下一个处理器
}
}
}
现在要加功能就简单了,比如加个“消息去重”处理器:
/**
* 消息去重处理器 - 优先级1,最先执行
*/
public class DeduplicationProcessor implements PushTaskProcessor {
// 简单用本地缓存去重,生产环境可以用Redis
private final Set<String> sentMsgIds = ConcurrentHashMap.newKeySet();
@Override
public void process(PushTask task) {
String msgId = task.getMessage().getMsgId();
if (sentMsgIds.contains(msgId)) {
System.out.println("消息重复,丢弃:" + msgId);
task.setProcessed(true); // 标记已完成,后续处理器不再执行
} else {
sentMsgIds.add(msgId);
}
}
@Override
public int getOrder() {
return 1;
}
}
使用时,只需要注册处理器:
EternalPushEngine engine = new EternalPushEngine(20, 5000);
engine.registerProcessor(new DeduplicationProcessor());
engine.registerProcessor(new ChannelSelectorProcessor());
engine.registerProcessor(new PushExecutorProcessor());
engine.start();
后来产品说要加“推送限流”,只用 10 分钟就写了个 RateLimitProcessor 注册进去就行,完全没动核心代码。这就是 设计模式 中“开闭原则”的实战——对扩展开放,对修改关闭。
四、实战场景:这玩意儿能解决哪些实际问题?
光说理论没用,咱得说点实际的。这个永动推送引擎,后来在咱团队用了好几个场景,效果都贼好。
1. 电商促销推送:千万级用户毫秒级触达
618大促时,运营要推送促销消息给所有用户,传统定时任务一次推送几万条就把数据库拖垮了。用这个引擎:
- 生产者:从用户表分批捞用户ID,每次1万条,丢进队列。
- 消费者:200个虚拟线程同时推送,调用第三方推送接口。
- 处理器:加了“用户分群过滤”“推送频率控制”“失败重试”。
结果5000万用户,1小时内全部推送完。
2. 实时风控:处理交易事件不延迟
交易系统需要实时分析每笔交易是否涉及欺诈,之前用单线程分析,遇到大促就积压。改用引擎后:
- 生产者:监听交易消息队列(如 Kafka),每来一条就包装成任务。
- 消费者:50个线程并行跑风控规则引擎。
- 处理器:加了“黑白名单过滤”“规则计算”“风险等级打分”。
现在每秒能处理5000笔交易,延迟低于100ms。
五、避坑指南:他手把手教咱踩过的坑
他说,这框架看着简单,但实际用的时候有几个坑,他以前在谷歌踩过,咱可别再踩了。
1. 队列容量别设太大:小心内存爆了
有个同事觉得“队列越大越好”,把队列容量设成了10万。结果某天消息暴增,队列里堆了8万条任务,每条任务1KB,直接占了80MB内存,服务器内存直接飙到95%,差点宕机。
建议:队列大小要根据服务器内存算,留出足够缓冲。用 BlockingQueue,满了会阻塞生产者,这是优点不是缺点。
2. 虚拟线程虽好,但别滥用
我们刚开始用虚拟线程,觉得可以开无限多,就开了1000个消费者。结果发现 CPU 没爆,但内存涨得很快。
建议:IO密集型任务,消费者数一般设为“预期并发连接数”即可,比如调用第三方接口,对方能承受的并发有限,开太多反而会打爆对方。
3. 一定要加任务超时控制:避免线程永久阻塞
有次推送接口突然变慢,响应时间从100ms变成10秒,结果线程全卡在等待响应,队列越积越多。幸好加了超时控制,任务超过5秒就被放弃,系统没崩溃。
建议:超时时间要结合业务合理设置,并记录失败任务后续处理。
4. 重试机制要带退避,别死循环
失败重试是好习惯,但如果不加控制,一个失败任务会无限重试,占满队列。他设计了一个重试策略:每个任务最多重试3次,且重试间隔指数退避(1秒、2秒、4秒)。这样临时故障能恢复,永久故障不会死循环。
// 在任务类中增加重试计数
public class PushTask {
private int retryCount = 0;
private static final int MAX_RETRIES = 3;
public boolean canRetry() {
return retryCount < MAX_RETRIES;
}
public void incrRetry() {
retryCount++;
}
}
// 在处理器链外层捕获异常,决定是否重试
catch (Exception e) {
if (task.canRetry()) {
task.incrRetry();
// 指数退避:2^retryCount * 1000 ms
long delay = (long) (Math.pow(2, task.getRetryCount()) * 1000);
Thread.sleep(delay);
taskQueue.put(task); // 重新入队
} else {
logErr("任务重试" + MAX_RETRIES + "次失败,进入死信队列");
sendToDeadLetter(task);
}
}
5. 监控不能少:可视化面板很有必要
他最后强调,再好的框架也需要监控。他顺手给引擎加了个简单的 JMX Bean,暴露队列大小、处理速度、失败数等指标,然后用 Prometheus + Grafana 展示。这样一眼就能看出系统健康状态。
六、总结性的说两句
我们后来问他,为啥他能这么快搞出这个框架。他说其实不是他厉害,而是掌握了“三个套路”:
- 复用成熟模式:生产者-消费者、责任链模式这些都是经典设计模式,不用自己瞎造轮子。
- 抓核心需求:当时的核心需求是“消息推送不中断、能自动恢复、易扩展”,所以重点放在了“永动循环”“自愈”“可插拔”上。
- 踩过的坑多:他以前在谷歌搞广告系统时,遇到过各种线程问题、内存问题,所以这次直接避开了这些坑。
我们听完恍然大悟:原来厉害的程序员不是能写出多复杂的代码,而是能把简单的东西用对地方,解决实际问题。
如果你对这类解决实际业务场景的高并发、可扩展架构设计感兴趣,欢迎来 云栈社区 交流探讨,这里有更多来自一线的实战案例和技术分享。