在高并发编程中,频繁创建和销毁线程的开销不容忽视。为了优化这一过程,Java 提供了 java.util.concurrent.ThreadPoolExecutor 线程池。它通过复用已有线程来执行多个任务,从而显著提升系统吞吐量。
但你是否思考过:线程池中的线程是如何完成“复用”的?它们怎样在任务之间切换?本文将带你深入 ThreadPoolExecutor 的源码,结合清晰的流程图和代码演示,为你揭示线程复用的核心机制。想深入了解更多的Java并发知识,可以关注云栈社区的后续更新。
一、线程池的基本结构回顾
首先,我们快速回顾一下 ThreadPoolExecutor 的核心构造参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:线程池中用来工作的核心线程数量。
- maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
- keepAliveTime:超出 corePoolSize 后创建的线程空闲存活时间(当
allowCoreThreadTimeOut为false时)。
- unit:
keepAliveTime 的时间单位。
- workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。
- threadFactory:线程池内部创建线程所用的工厂。
- handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。
当一个新任务提交时,线程池的处理逻辑(简化版)如下:
- 如果当前工作线程数 <
corePoolSize → 创建新线程执行任务。
- 否则,尝试将任务放入
workQueue。
- 如果队列满且工作线程数 <
maximumPoolSize → 创建新线程。
- 否则,触发拒绝策略。
这个决策流程可以通过下面的流程图直观理解:

execute(Runnable command) 方法的源码(OpenJDK 8)清晰地体现了这一逻辑:
// 代码版本:OpenJDK 8
public void execute(Runnable command) {
if (command == null) // 1. 判空
throw new NullPointerException();
int c = ctl.get(); // 2. 原子读 ctl:高 3 位存 runState,低 29 位存 workerCount
// 3. 步骤一:线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // true 表示“核心线程”
return; // 成功就直接返回
c = ctl.get(); // 失败再读一次
}
// 4. 步骤二:线程池 RUNNING 且队列能 offer,将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 5. 双重检查:如果突然 SHUTDOWN,需要回滚
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) // 防止第一个线程都没
addWorker(null, false);
}
// 6. 步骤三:队列已满,尝试创建新线程(非核心线程)
else if (!addWorker(command, false))
// 7. 步骤四: 队列已满且线程数已达最大,执行拒绝策略
reject(command);
}
现在,核心问题来了:这些线程执行完一个任务后,是如何继续执行下一个任务的?
答案就藏在 Worker 类和它的 run() 方法中,这正是实现高并发场景下资源复用的关键。
二、线程复用的核心:Worker 与 runWorker()
2.1 Worker 是什么?
Worker 是 ThreadPoolExecutor 的一个内部类,它实现了 Runnable 接口,并持有一个 Thread 对象:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
每个 Worker 实例都对应一个具体的线程(thread),并可能绑定一个初始任务(firstTask)。当这个线程启动时,会执行 Worker 自己的 run() 方法,进而调用核心的 runWorker(this)。
2.2 runWorker():线程复用的关键方法
runWorker(Worker w) 的核心逻辑(基于JDK 8源码简化)如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 取出第一个任务
w.firstTask = null;
try {
while (task != null || (task = getTask()) != null) {
w.lock(); // 获取锁,防止中断
try {
beforeExecute(wt, task); // 钩子方法
try {
task.run(); // 执行任务!
} finally {
afterExecute(task, null); // 钩子方法
}
} finally {
w.unlock();
task = null;
w.completedTasks++;
}
}
} finally {
processWorkerExit(w, true); // 线程退出
}
}
关键点解析:
while 循环:这是线程复用的核心所在!线程不会在执行完一个任务后就结束,而是持续从任务队列中获取新任务来执行。
getTask():这个方法负责从 workQueue 中阻塞或非阻塞地获取下一个任务。
task.run():注意,这里是直接调用任务的 run() 方法,而不是启动新线程的 start(),因为承载这个 Worker 的线程已经在运行了。
结论:线程复用的本质是——一个线程在一个无限循环中不断从阻塞队列取任务并执行,而不是“执行完一个就销毁,再创建新的”。
三、getTask():任务是如何被取出的?
getTask() 方法决定了线程何时阻塞等待、何时超时、何时应该退出。其核心逻辑如下(简化版):
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池已关闭且队列为空,返回 null,线程退出
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否允许超时(取决于是否超过 corePoolSize)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从队列中取任务:可能阻塞!
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // 阻塞直到有任务
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
重点说明:
- 如果工作线程数 ≤
corePoolSize 且未开启 allowCoreThreadTimeOut,那么 getTask() 会调用 take() 永久阻塞,直到有新任务到来。
- 如果工作线程数 >
corePoolSize(或者开启了核心线程超时),则会使用 poll(timeout) 进行限时等待,超时后返回 null,从而导致线程退出。
- 当
getTask() 返回 null 时,runWorker 中的 while 循环条件不再满足,循环结束,线程自然终止。
这就是为什么核心线程默认不会被回收,而非核心线程会在空闲 keepAliveTime 后被回收的设计原理。
四、实战演示:观察线程复用
我们编写一个简单的例子来验证线程复用:
public class ThreadPoolReuseDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10)
);
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId +
" executed by thread: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
输出结果:
Task 0 executed by thread: pool-1-thread-1
Task 1 executed by thread: pool-1-thread-1
Task 2 executed by thread: pool-1-thread-1
Task 3 executed by thread: pool-1-thread-1
Task 4 executed by thread: pool-1-thread-1
可以看到,所有5个任务都由同一个线程执行!这是线程复用的最直接证据。
五、常见误区澄清
| 误区 |
正确理解 |
| “线程池里的线程会自动切换任务” |
实际是同一个线程循环取任务执行,并非操作系统层面的“线程切换” |
| “每个任务都新建线程” |
只有在超出 corePoolSize 且队列满时,才会新建非核心线程 |
| “核心线程永远不会死” |
若调用 setAllowCoreThreadTimeOut(true),核心线程在空闲超时后也会被回收 |
六、总结
- 线程复用的本质:线程在
runWorker() 方法的一个 while 循环中,不断调用 getTask() 从任务队列获取新任务并执行。
- 关键方法:
runWorker() 是实现循环的驱动力,getTask() 是获取任务的控制器。
- Worker的角色:
Worker 封装了线程和首个任务,是线程池工作的基本执行单元。
- 线程生命周期:由
getTask() 的返回值控制,返回 null 则线程退出循环并结束。
通过 Worker + runWorker() + getTask() 这套精妙的设计,ThreadPoolExecutor 有效避免了频繁创建与销毁线程的系统开销,极大地提升了并发性能。深入理解这一机制,不仅能帮助我们在计算机基础层面理解并发模型,更能让我们在开发中更好地使用线程池,并在遇到相关性能问题或异常时,能够快速、准确地定位根源。