找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

3760

积分

0

好友

518

主题
发表于 1 小时前 | 查看: 3| 回复: 0

小伙伴们,你们有没有遇到过这样的新同事:入职第一天,不办入职手续、不领电脑、不申请各种权限,自己带着笔记本往工位一坐,打开 IDEA,手指在键盘上飞起,噼里啪啦一顿操作,俩小时后甩给你一个方案,直接把你引以为傲的“祖传代码”按在地上摩擦?

想象一下,你们团队上周就空降了这么一位——前谷歌工程师,据说在硅谷搞过广告实时竞价系统。结果人家到岗第一天下午,就撞上了产品经理的夺命 call。

一、崩溃现场:消息推送又卡死,产品经理要砸电脑

产品经理小张冲进你们工位,脸涨得通红:“用户消息推送又卡死了!后台积压了五十万条推送,再不发出去,用户都以为我们 app 死了!”

你们几个老油条立刻围成一团,开始经典救火三板斧:

  • “要不加个定时任务,每分钟扫一次积压消息?”
  • “扩容服务器吧,搞几台高性能的,堆硬件!”
  • “肯定是数据库连接池太小了,改大点,再大点!”

正讨论得热火朝天,角落里默默敲代码的新同事突然抬起头,摘下耳机,淡定地问:“你们用的是单线程轮询吧?”

你们一愣:“是啊,怎么了?”

他笑了笑:“那难怪会卡死。让我试试?”

然后他转身回工位,戴上耳机,双手往键盘上一搭——接下来两个小时,你们见证了什么叫“降维打击”。

两小时后,他把一个叫“多线程永动推送引擎”的代码扔到群里。结果你猜怎么着?消息推送速度直接飙升 10 倍,积压的五十万条消息半小时就发完了,内存稳得一批,CPU 利用率还不到 30%。

你们凑过去看他代码时的表情,就像山顶洞人第一次看见打火机:每个符号都认识,连在一起就看不懂了!后来他花了半小时给大家捋了一遍,大家这才恍然大悟:原来 多线程 还能这么玩?

二、先整明白:啥叫“永动任务”?和永动机有啥区别?

在说他的方案前,咱们要先搞清楚一个问题:啥叫“永动任务”?难道真能像永动机一样不用管,自己跑一辈子?

其实这玩意儿本质就是“能一直跑、不中断、还能自动处理任务”的多线程程序。就拿我们遇到的消息推送来说,传统玩法是这样的:

  1. 写个单线程程序,循环查数据库,查出一条推送一条。(像一个人搬砖,累死累活)
  2. 觉得慢?那就开个线程池,每次批量查 100 条扔给线程池推送。(像找了几个搬砖工,但大家抢着搬)
  3. 怕程序崩了?加个 try-catch,崩了就手动重启。(像砖倒了,再叫人扶起来)

但这玩法坑太多了,例如:

  • 任务断档:线程池里的任务跑完了,得等下一次定时任务触发才会继续查数据,中间空窗期可能好几分钟。
  • 线程浪费:如果消息少的时候,线程池里的线程全闲着,占着内存还没活儿干。
  • 崩了没人管:万一程序因为内存溢出崩了,得等运维发现了才能重启,中间丢的数据都找不回来。
  • 不好扩展:想加个“消息去重”功能,得改核心代码,改完还得重新部署,一不小心就引入新 BUG。

那他搞的“永动推送引擎”是咋解决这些问题的?简单说就是三个特点:

  1. 不停机:任务处理完一批,自动捞下一批,中间不空档,跟永动机似的。
  2. 自修复:某个线程崩了,自动重启,不影响整体。
  3. 好扩展:想加新功能,直接加“处理器”,不用改核心逻辑,跟乐高积木一样。

三、拆解方案:300行代码搞定永动推送核心

当时他扔出来的代码,核心也就 300 来行,咱分步骤拆开看。先说明下,他用的是 Java 21 的新特性虚拟线程(Virtual Threads),但思路换传统线程池也能用,咱重点看逻辑。

1. 先搭个“任务传送带”:让任务像流水线一样跑起来

首先得有个“传送带”,能装任务,还能让任务循环跑。他用了 生产者-消费者模式,简单说就是:一个线程负责捞消息(生产者),多个线程负责推送消息(消费者),中间用个队列存任务,像工厂流水线一样。

咱先看核心类的结构:

/**
 * 永动推送引擎 - 让消息推送像吃了炫迈一样停不下来
 */
public class EternalPushEngine {
    // 任务队列:存待推送的消息(生产者放,消费者拿)
    private final BlockingQueue<PushTask> taskQueue;
    // 消费者执行器:处理消息的线程(用虚拟线程,轻量级,想开多少开多少)
    private final ExecutorService consumerExecutor;
    // 生产者执行器:捞消息的线程
    private final ExecutorService producerExecutor;
    // 开关:控制任务启停
    private volatile boolean running = false;
    // 监控线程:用于自愈和统计
    private Thread monitorThread;

    /**
     * 初始化引擎
     * @param consumerNum 消费者线程数(工人数量)
     * @param queueCapacity 队列容量(原材料仓库大小)
     */
    public EternalPushEngine(int consumerNum, int queueCapacity) {
        // 用LinkedBlockingQueue,满了会阻塞生产者,避免内存爆了(就像仓库满了,采购员就先歇着)
        this.taskQueue = new LinkedBlockingQueue<>(queueCapacity);
        // 消费者用虚拟线程池,可以开很多,不耗资源(像请了一群兼职,随叫随到)
        this.consumerExecutor = Executors.newVirtualThreadPerTaskExecutor();
        // 生产者用单线程虚拟线程(只需要一个采购员)
        this.producerExecutor = Executors.newVirtualThreadPerTaskExecutor();
    }

    /**
     * 启动引擎
     */
    public synchronized void start() {
        if (running) {
            log("引擎已经在跑了,别急~");
            return;
        }
        running = true;
        // 启动生产者
        producerExecutor.submit(this::produceLoop);
        // 启动消费者
        for (int i = 0; i < consumerNum; i++) {
            consumerExecutor.submit(this::consumeLoop);
        }
        // 启动监控线程(用于自愈和健康检查)
        monitorThread = new Thread(this::monitorLoop, "Monitor-Thread");
        monitorThread.start();
        log("永动推送引擎启动成功,比你家楼下早餐店还敬业!");
    }

    /**
     * 停止引擎(优雅关闭)
     */
    public synchronized void stop() {
        running = false;
        // 中断监控线程
        if (monitorThread != null) {
            monitorThread.interrupt();
        }
        // 关闭线程池
        producerExecutor.shutdown();
        consumerExecutor.shutdown();
        try {
            // 等30秒
            if (!producerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                producerExecutor.shutdownNow();
            }
            if (!consumerExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                consumerExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            producerExecutor.shutdownNow();
            consumerExecutor.shutdownNow();
        }
        log("引擎已停止,所有任务都处理完了,放心~");
    }

    /**
     * 生产者循环:捞消息
     */
    private void produceLoop() {
        while (running) {
            try {
                // 1. 从数据库捞待推送的消息(每次捞200条)
                List<PushMessage> messages = fetchPendingMessages(200);
                if (messages.isEmpty()) {
                    // 没消息?歇2秒再捞,别把数据库查崩了(像钓鱼,没鱼就歇会儿)
                    Thread.sleep(2000);
                    continue;
                }
                // 2. 包装成任务,丢进队列
                for (PushMessage msg : messages) {
                    PushTask task = new PushTask(msg);
                    // 队列满了会阻塞,不会OOM(像超市收银台排满了,后面的顾客就等着)
                    taskQueue.put(task);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                logErr("捞消息翻车了:" + e.getMessage());
                try {
                    // 出错歇5秒再试
                    Thread.sleep(5000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    /**
     * 消费者循环:推送消息
     */
    private void consumeLoop() {
        while (running) {
            try {
                // 从队列拿任务,没任务会阻塞(不用轮询,省CPU)
                PushTask task = taskQueue.take();
                // 处理任务(这里可以加超时控制,后面讲)
                processTask(task);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                logErr("推送任务翻车了:" + e.getMessage());
            }
        }
    }

    /**
     * 处理单个任务
     */
    private void processTask(PushTask task) {
        // 这里会调用注册的处理器链(后面讲)
        for (PushTaskProcessor processor : processors) {
            if (task.isProcessed()) break;
            processor.process(task);
        }
    }

    // 模拟从数据库捞消息
    private List<PushMessage> fetchPendingMessages(int limit) {
        // 实际查数据库,返回待推送消息
        return MessageDAO.fetchPending(limit);
    }

    // 日志辅助
    private void log(String msg) { System.out.println("[" + Thread.currentThread().getName() + "] " + msg); }
    private void logErr(String msg) { System.err.println("[" + Thread.currentThread().getName() + "] ERROR: " + msg); }
}

这段代码看着长,其实核心就三件事:

  • 生产者:不停捞消息,丢进队列,没消息就歇2秒,避免空转。
  • 消费者:从队列拿任务处理,没任务就阻塞,不用一直轮询,省 CPU。
  • 开关控制:start/stop 方法,优雅启停,避免丢任务。

这里有个细节:他用的是 Java 21 的虚拟线程(Virtual Threads),消费者线程数可以开很大(比如几百个)而不耗太多资源。传统线程池线程数受限于系统资源,虚拟线程可以轻松开上千个,非常适合 IO 密集型任务。

2. 加个“自动修复功能”:线程挂了自动复活,比小强还顽强

光有循环跑还不够,万一某个消费者线程处理任务时抛了个异常崩了咋办?比如推送接口超时,或者消息格式异常导致 NPE,线程直接挂了,那少个线程处理效率不就降了?

他用虚拟线程天然不怕挂,因为虚拟线程是 JVM 管理的,挂了一个不影响其他。但为了更健壮,他加了个监控线程,定期检查消费者线程的健康状况。

/**
 * 监控循环:定期检查消费者健康,自动修复
 */
private void monitorLoop() {
    // 记录上一次检查时的消费者数量
    int lastConsumerCount = 0;
    while (running) {
        try {
            // 每10秒检查一次
            Thread.sleep(10000);

            // 1. 检查队列堆积情况
            int queueSize = taskQueue.size();
            if (queueSize > taskQueue.remainingCapacity() * 0.8) {
                log("警告:队列堆积严重,当前大小:" + queueSize);
                // 可以动态增加消费者,这里简化,只是警告
            }

            // 2. 检查消费者数量(虚拟线程池没有直接API获取线程数,但我们可以通过记录提交的任务数来估算)
            // 这里简单处理:如果队列不为空但很长时间没变化,可能消费者都死了,就重启一批
            // 实际可以用更复杂的机制,比如每个任务记录时间戳

            // 3. 检查生产者是否还活着
            // 同样简化处理

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        } catch (Exception e) {
            logErr("监控线程出错了:" + e.getMessage());
        }
    }
}

但这还不够,如果某个消费者线程因为代码 bug 导致永久阻塞(比如死锁),监控线程也救不了。他用了更狠的一招:给每个任务设置超时时间,超过指定时间就放弃,防止任务卡死线程。

// 在消费者循环中,用CompletableFuture加超时
private void consumeLoop() {
    while (running) {
        try {
            PushTask task = taskQueue.take();
            // 用CompletableFuture执行任务,并设置超时
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> processTask(task), consumerExecutor);
            // 等待最多5秒
            future.get(5, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            logErr("任务处理超时,放弃:" + e.getMessage());
            // 可以记录失败,稍后重试
        } catch (Exception e) {
            logErr("消费者循环异常:" + e.getMessage());
        }
    }
}

这样,即使某个任务卡住了,5 秒后也会超时放弃,线程可以继续处理下一个任务。

3. 搞“可插拔处理器”:加功能像换手机壳,不用动内核

需求天天变。今天要加消息去重,明天要加推送统计。如果每次改需求都要动核心代码,早晚得改出 BUG。

他在设计时用了 责任链模式,把每个处理步骤抽象成独立的处理器,可以动态组合。

先定义处理器接口:

/**
 * 推送任务处理器接口 - 所有处理逻辑都实现这个接口(像手机壳,想换就换)
 */
public interface PushTaskProcessor {
    /**
     * 处理任务
     * @param task 推送任务
     */
    void process(PushTask task);

    /**
     * 处理器优先级(数字越小越先执行)
     */
    default int getOrder() {
        return 0;
    }
}

然后在引擎中维护一个处理器列表,并添加注册方法:

// 处理器列表
private final List<PushTaskProcessor> processors = new CopyOnWriteArrayList<>();

/**
 * 注册处理器
 */
public void registerProcessor(PushTaskProcessor processor) {
    processors.add(processor);
    // 按优先级排序
    processors.sort(Comparator.comparingInt(PushTaskProcessor::getOrder));
    log("注册处理器:" + processor.getClass().getSimpleName());
}

修改 processTask 方法,遍历调用处理器:

private void processTask(PushTask task) {
    for (PushTaskProcessor processor : processors) {
        if (task.isProcessed()) {
            break; // 如果任务已被标记为完成(比如去重时过滤掉),就停止后续处理
        }
        try {
            processor.process(task);
        } catch (Exception e) {
            logErr("处理器 " + processor.getClass().getSimpleName() + " 执行失败:" + e.getMessage());
            // 根据策略决定是否继续,这里选择继续下一个处理器
        }
    }
}

现在要加功能就简单了,比如加个“消息去重”处理器:

/**
 * 消息去重处理器 - 优先级1,最先执行
 */
public class DeduplicationProcessor implements PushTaskProcessor {
    // 简单用本地缓存去重,生产环境可以用Redis
    private final Set<String> sentMsgIds = ConcurrentHashMap.newKeySet();

    @Override
    public void process(PushTask task) {
        String msgId = task.getMessage().getMsgId();
        if (sentMsgIds.contains(msgId)) {
            System.out.println("消息重复,丢弃:" + msgId);
            task.setProcessed(true); // 标记已完成,后续处理器不再执行
        } else {
            sentMsgIds.add(msgId);
        }
    }

    @Override
    public int getOrder() {
        return 1;
    }
}

使用时,只需要注册处理器:

EternalPushEngine engine = new EternalPushEngine(20, 5000);
engine.registerProcessor(new DeduplicationProcessor());
engine.registerProcessor(new ChannelSelectorProcessor());
engine.registerProcessor(new PushExecutorProcessor());
engine.start();

后来产品说要加“推送限流”,只用 10 分钟就写了个 RateLimitProcessor 注册进去就行,完全没动核心代码。这就是 设计模式 中“开闭原则”的实战——对扩展开放,对修改关闭。

四、实战场景:这玩意儿能解决哪些实际问题?

光说理论没用,咱得说点实际的。这个永动推送引擎,后来在咱团队用了好几个场景,效果都贼好。

1. 电商促销推送:千万级用户毫秒级触达

618大促时,运营要推送促销消息给所有用户,传统定时任务一次推送几万条就把数据库拖垮了。用这个引擎:

  • 生产者:从用户表分批捞用户ID,每次1万条,丢进队列。
  • 消费者:200个虚拟线程同时推送,调用第三方推送接口。
  • 处理器:加了“用户分群过滤”“推送频率控制”“失败重试”。

结果5000万用户,1小时内全部推送完。

2. 实时风控:处理交易事件不延迟

交易系统需要实时分析每笔交易是否涉及欺诈,之前用单线程分析,遇到大促就积压。改用引擎后:

  • 生产者:监听交易消息队列(如 Kafka),每来一条就包装成任务。
  • 消费者:50个线程并行跑风控规则引擎。
  • 处理器:加了“黑白名单过滤”“规则计算”“风险等级打分”。

现在每秒能处理5000笔交易,延迟低于100ms。

五、避坑指南:他手把手教咱踩过的坑

他说,这框架看着简单,但实际用的时候有几个坑,他以前在谷歌踩过,咱可别再踩了。

1. 队列容量别设太大:小心内存爆了

有个同事觉得“队列越大越好”,把队列容量设成了10万。结果某天消息暴增,队列里堆了8万条任务,每条任务1KB,直接占了80MB内存,服务器内存直接飙到95%,差点宕机。

建议:队列大小要根据服务器内存算,留出足够缓冲。用 BlockingQueue,满了会阻塞生产者,这是优点不是缺点。

2. 虚拟线程虽好,但别滥用

我们刚开始用虚拟线程,觉得可以开无限多,就开了1000个消费者。结果发现 CPU 没爆,但内存涨得很快。

建议:IO密集型任务,消费者数一般设为“预期并发连接数”即可,比如调用第三方接口,对方能承受的并发有限,开太多反而会打爆对方。

3. 一定要加任务超时控制:避免线程永久阻塞

有次推送接口突然变慢,响应时间从100ms变成10秒,结果线程全卡在等待响应,队列越积越多。幸好加了超时控制,任务超过5秒就被放弃,系统没崩溃。

建议:超时时间要结合业务合理设置,并记录失败任务后续处理。

4. 重试机制要带退避,别死循环

失败重试是好习惯,但如果不加控制,一个失败任务会无限重试,占满队列。他设计了一个重试策略:每个任务最多重试3次,且重试间隔指数退避(1秒、2秒、4秒)。这样临时故障能恢复,永久故障不会死循环。

// 在任务类中增加重试计数
public class PushTask {
    private int retryCount = 0;
    private static final int MAX_RETRIES = 3;

    public boolean canRetry() {
        return retryCount < MAX_RETRIES;
    }

    public void incrRetry() {
        retryCount++;
    }
}

// 在处理器链外层捕获异常,决定是否重试
catch (Exception e) {
    if (task.canRetry()) {
        task.incrRetry();
        // 指数退避:2^retryCount * 1000 ms
        long delay = (long) (Math.pow(2, task.getRetryCount()) * 1000);
        Thread.sleep(delay);
        taskQueue.put(task); // 重新入队
    } else {
        logErr("任务重试" + MAX_RETRIES + "次失败,进入死信队列");
        sendToDeadLetter(task);
    }
}

5. 监控不能少:可视化面板很有必要

他最后强调,再好的框架也需要监控。他顺手给引擎加了个简单的 JMX Bean,暴露队列大小、处理速度、失败数等指标,然后用 Prometheus + Grafana 展示。这样一眼就能看出系统健康状态。

六、总结性的说两句

我们后来问他,为啥他能这么快搞出这个框架。他说其实不是他厉害,而是掌握了“三个套路”:

  1. 复用成熟模式:生产者-消费者、责任链模式这些都是经典设计模式,不用自己瞎造轮子。
  2. 抓核心需求:当时的核心需求是“消息推送不中断、能自动恢复、易扩展”,所以重点放在了“永动循环”“自愈”“可插拔”上。
  3. 踩过的坑多:他以前在谷歌搞广告系统时,遇到过各种线程问题、内存问题,所以这次直接避开了这些坑。

我们听完恍然大悟:原来厉害的程序员不是能写出多复杂的代码,而是能把简单的东西用对地方,解决实际问题。

如果你对这类解决实际业务场景的高并发、可扩展架构设计感兴趣,欢迎来 云栈社区 交流探讨,这里有更多来自一线的实战案例和技术分享。




上一篇:如何在C++中运用适配器模式解决第三方库接口不兼容问题
下一篇:Python loguru 库:替代logging的懒人神器,轻松实现日志记录与轮转
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-15 12:21 , Processed in 0.580392 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表