找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2117

积分

1

好友

287

主题
发表于 2025-12-31 00:34:52 | 查看: 20| 回复: 0

在高并发编程中,频繁创建和销毁线程会带来巨大的性能开销。为了解决这个问题,Java 提供了 java.util.concurrent.ThreadPoolExecutor 线程池机制,通过复用已有线程来执行多个任务,从而显著提升系统吞吐量和资源利用率。

但你是否曾好奇:线程池中的线程究竟是如何被“复用”的? 它们是如何从一个任务切换到另一个任务的?本文将深入 ThreadPoolExecutor 的底层源码,结合图文与代码,为你揭开线程复用的神秘面纱,更多关于高并发编程的内容可以访问 后端 & 架构 板块进行交流。

线程池的基本结构回顾

首先,我们快速回顾一下 ThreadPoolExecutor 的核心组件与构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:线程池中用来工作的核心线程数量。
  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
  • keepAliveTime:超出 corePoolSize 后创建的线程存活时间或者是所有线程最大存活时间,取决于配置。
  • unit:keepAliveTime 的时间单位。
  • workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。
  • threadFactory:线程池内部创建线程所用的工厂。
  • handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。

当提交一个任务时,线程池会按以下逻辑处理(简化版):

  1. 如果当前线程数 < corePoolSize → 创建新线程执行任务;
  2. 否则,尝试将任务放入 workQueue;
  3. 如果队列满且线程数 < maximumPoolSize → 创建新线程;
  4. 否则,触发拒绝策略。

图1:ThreadPoolExecutor任务提交与线程管理流程图
ThreadPoolExecutor任务提交流程

这个逻辑清晰地体现在 execute 方法中(基于 OpenJDK 8 源码):

public void execute(Runnable command) {
    if (command == null)            // 1. 判空
        throw new NullPointerException();
    int c = ctl.get();              // 2. 原子读 ctl:高 3 位存 runState,低 29 位存 workerCount
    // 3. 步骤一:线程数 < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))   // true 表示“核心线程”
            return;                     // 成功就直接返回
        c = ctl.get();                  // 失败再读一次
    }
    // 4. 步骤二:线程池 RUNNING 且队列能 offer,将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 5. 双重检查:如果突然 SHUTDOWN,需要回滚
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)   // 防止第一个线程都没
            addWorker(null, false);
    }
    // 6. 步骤三:队列已满,尝试创建新线程(非核心线程)
    else if (!addWorker(command, false))
        // 7. 步骤四: 队列已满且线程数已达最大,执行拒绝策略
        reject(command);
}

但关键问题来了:这些线程执行完一个任务后,是怎么继续执行下一个任务的?
答案就藏在 Worker 类和它的 run() 方法中。

线程复用的核心:Worker 与 runWorker()

3.1 Worker 是什么?

WorkerThreadPoolExecutor 的一个内部类,它实现了 Runnable 接口,并持有一个 Thread 对象:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

每个 Worker 实例对应一个线程(thread),并且绑定一个初始任务(firstTask)。当线程启动时,会调用 run(),进而执行 runWorker(this)。了解这类 JUC 核心类的内部原理,是深入Java并发的关键。

3.2 runWorker():线程复用的关键方法

我们来看 runWorker(Worker w) 的核心逻辑(JDK 8 源码节选并简化):

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // 取出第一个任务
    w.firstTask = null;

    try {
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 获取锁,防止中断
            try {
                beforeExecute(wt, task); // 钩子方法
                try {
                    task.run(); // 执行任务!
                } finally {
                    afterExecute(task, null); // 钩子方法
                }
            } finally {
                w.unlock();
                task = null;
                w.completedTasks++;
            }
        }
    } finally {
        processWorkerExit(w, true); // 线程退出
    }
}

关键点解析:

  • while 循环:这是线程复用的核心!线程不会在执行完一个任务后就结束,而是持续从任务队列中获取新任务
  • getTask():从 workQueue 中阻塞或非阻塞地获取下一个任务。
  • task.run():直接调用任务的 run() 方法(注意:不是 start()!因为线程已经在运行了)。

结论:线程复用的本质是——一个线程在一个无限循环中不断从队列取任务并执行,而不是“执行完就销毁”。

四、getTask():任务是如何被取出的?

getTask() 方法决定了线程何时阻塞、何时超时、何时退出。其核心逻辑如下(简化):

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果线程池已关闭且队列为空,返回 null,线程退出
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 是否允许超时(取决于是否超过 corePoolSize)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 从队列中取任务:可能阻塞!
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take(); // 阻塞直到有任务
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

重点说明:

  • 如果线程数 ≤ corePoolSize 且未开启 allowCoreThreadTimeOut,则 take()永久阻塞,直到有新任务到来。
  • 如果线程数 > corePoolSize,则使用 poll(timeout)超时后返回 null,导致线程退出。
  • 返回 null 后,runWorker 的 while 循环结束,线程自然终止。

这就是为什么核心线程默认不会被回收,而非核心线程会在空闲 keepAliveTime 后被回收。

五、实战演示:观察线程复用

我们写一个简单例子验证:

public class ThreadPoolReuseDemo {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(10)
        );

        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId +
                    " executed by thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

输出结果

Task 0 executed by thread: pool-1-thread-1
Task 1 executed by thread: pool-1-thread-1
Task 2 executed by thread: pool-1-thread-1
Task 3 executed by thread: pool-1-thread-1
Task 4 executed by thread: pool-1-thread-1

所有任务都由同一个线程执行!这就是线程复用的直接证据。

六、常见误区澄清

误区 正确理解
“线程池里的线程会自动切换任务” 实际是同一个线程循环取任务执行,并非“切换”
“每个任务都新建线程” 只有超出 corePoolSize 且队列满时才新建
“核心线程永远不会死” 若设置 allowCoreThreadTimeOut(true),核心线程也会超时回收

七、总结

  • 线程复用的本质:线程在一个 while 循环中不断从任务队列获取并执行任务。
  • 关键方法runWorker() + getTask()
  • Worker 封装了线程和任务,是线程池工作的基本单元。
  • 线程生命周期由 getTask() 返回值控制:返回 null 则线程退出。

通过这种设计,Java 线程池避免了频繁创建/销毁线程的开销,极大提升了并发性能。理解这一机制,不仅能写出更高效的代码,还能在排查线程池问题时游刃有余。希望本文的解析对你有所帮助,如果你想探讨更多技术话题,欢迎访问 云栈社区 与开发者们一同交流成长。




上一篇:谷歌购物广告架构与策略实战指南:从PLA到Pmax的避坑指南
下一篇:Zen Browser开源浏览器评测:垂直标签页与分屏浏览的极致生产力工具
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 18:36 , Processed in 0.367177 second(s), 40 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表