找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

5048

积分

0

好友

669

主题
发表于 1 小时前 | 查看: 2| 回复: 0

在高并发编程中,频繁创建和销毁线程的开销不容忽视。为了优化这一过程,Java 提供了 java.util.concurrent.ThreadPoolExecutor 线程池。它通过复用已有线程来执行多个任务,从而显著提升系统吞吐量。

但你是否思考过:线程池中的线程是如何完成“复用”的?它们怎样在任务之间切换?本文将带你深入 ThreadPoolExecutor 的源码,结合清晰的流程图和代码演示,为你揭示线程复用的核心机制。想深入了解更多的Java并发知识,可以关注云栈社区的后续更新。

一、线程池的基本结构回顾

首先,我们快速回顾一下 ThreadPoolExecutor 的核心构造参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize:线程池中用来工作的核心线程数量。
  • maximumPoolSize:最大线程数,线程池允许创建的最大线程数。
  • keepAliveTime:超出 corePoolSize 后创建的线程空闲存活时间(当allowCoreThreadTimeOutfalse时)。
  • unitkeepAliveTime 的时间单位。
  • workQueue:任务队列,是一个阻塞队列,当线程数达到核心线程数后,会将任务存储在阻塞队列中。
  • threadFactory:线程池内部创建线程所用的工厂。
  • handler:拒绝策略;当队列已满并且线程数量达到最大线程数量时,会调用该方法处理任务。

当一个新任务提交时,线程池的处理逻辑(简化版)如下:

  1. 如果当前工作线程数 < corePoolSize → 创建新线程执行任务。
  2. 否则,尝试将任务放入 workQueue
  3. 如果队列满且工作线程数 < maximumPoolSize → 创建新线程。
  4. 否则,触发拒绝策略。

这个决策流程可以通过下面的流程图直观理解:

ThreadPoolExecutor任务提交与执行核心流程

execute(Runnable command) 方法的源码(OpenJDK 8)清晰地体现了这一逻辑:

// 代码版本:OpenJDK 8
public void execute(Runnable command) {
    if (command == null)            // 1. 判空
        throw new NullPointerException();
    int c = ctl.get();              // 2. 原子读 ctl:高 3 位存 runState,低 29 位存 workerCount
    // 3. 步骤一:线程数 < corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))   // true 表示“核心线程”
            return;                     // 成功就直接返回
        c = ctl.get();                  // 失败再读一次
    }
    // 4. 步骤二:线程池 RUNNING 且队列能 offer,将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 5. 双重检查:如果突然 SHUTDOWN,需要回滚
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)   // 防止第一个线程都没
            addWorker(null, false);
    }
    // 6. 步骤三:队列已满,尝试创建新线程(非核心线程)
    else if (!addWorker(command, false))
        // 7. 步骤四: 队列已满且线程数已达最大,执行拒绝策略
        reject(command); 
}

现在,核心问题来了:这些线程执行完一个任务后,是如何继续执行下一个任务的?
答案就藏在 Worker 类和它的 run() 方法中,这正是实现高并发场景下资源复用的关键。

二、线程复用的核心:Worker 与 runWorker()

2.1 Worker 是什么?

WorkerThreadPoolExecutor 的一个内部类,它实现了 Runnable 接口,并持有一个 Thread 对象:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

每个 Worker 实例都对应一个具体的线程(thread),并可能绑定一个初始任务(firstTask)。当这个线程启动时,会执行 Worker 自己的 run() 方法,进而调用核心的 runWorker(this)

2.2 runWorker():线程复用的关键方法

runWorker(Worker w) 的核心逻辑(基于JDK 8源码简化)如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // 取出第一个任务
    w.firstTask = null;

    try {
        while (task != null || (task = getTask()) != null) {
            w.lock(); // 获取锁,防止中断
            try {
                beforeExecute(wt, task); // 钩子方法
                try {
                    task.run(); // 执行任务!
                } finally {
                    afterExecute(task, null); // 钩子方法
                }
            } finally {
                w.unlock();
                task = null;
                w.completedTasks++;
            }
        }
    } finally {
        processWorkerExit(w, true); // 线程退出
    }
}

关键点解析:

  • while 循环:这是线程复用的核心所在!线程不会在执行完一个任务后就结束,而是持续从任务队列中获取新任务来执行。
  • getTask():这个方法负责从 workQueue 中阻塞或非阻塞地获取下一个任务。
  • task.run():注意,这里是直接调用任务的 run() 方法,而不是启动新线程的 start(),因为承载这个 Worker 的线程已经在运行了。

结论:线程复用的本质是——一个线程在一个无限循环中不断从阻塞队列取任务并执行,而不是“执行完一个就销毁,再创建新的”。

三、getTask():任务是如何被取出的?

getTask() 方法决定了线程何时阻塞等待、何时超时、何时应该退出。其核心逻辑如下(简化版):

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果线程池已关闭且队列为空,返回 null,线程退出
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 是否允许超时(取决于是否超过 corePoolSize)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 从队列中取任务:可能阻塞!
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take(); // 阻塞直到有任务
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

重点说明:

  • 如果工作线程数 ≤ corePoolSize 且未开启 allowCoreThreadTimeOut,那么 getTask() 会调用 take() 永久阻塞,直到有新任务到来。
  • 如果工作线程数 > corePoolSize(或者开启了核心线程超时),则会使用 poll(timeout) 进行限时等待,超时后返回 null,从而导致线程退出。
  • getTask() 返回 null 时,runWorker 中的 while 循环条件不再满足,循环结束,线程自然终止。

这就是为什么核心线程默认不会被回收,而非核心线程会在空闲 keepAliveTime 后被回收的设计原理。

四、实战演示:观察线程复用

我们编写一个简单的例子来验证线程复用:

public class ThreadPoolReuseDemo {
    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(10)
        );

        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + 
                    " executed by thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

输出结果:

Task 0 executed by thread: pool-1-thread-1
Task 1 executed by thread: pool-1-thread-1
Task 2 executed by thread: pool-1-thread-1
Task 3 executed by thread: pool-1-thread-1
Task 4 executed by thread: pool-1-thread-1

可以看到,所有5个任务都由同一个线程执行!这是线程复用的最直接证据。

五、常见误区澄清

误区 正确理解
“线程池里的线程会自动切换任务” 实际是同一个线程循环取任务执行,并非操作系统层面的“线程切换”
“每个任务都新建线程” 只有在超出 corePoolSize 且队列满时,才会新建非核心线程
“核心线程永远不会死” 若调用 setAllowCoreThreadTimeOut(true),核心线程在空闲超时后也会被回收

六、总结

  • 线程复用的本质:线程在 runWorker() 方法的一个 while 循环中,不断调用 getTask() 从任务队列获取新任务并执行。
  • 关键方法runWorker() 是实现循环的驱动力,getTask() 是获取任务的控制器。
  • Worker的角色Worker 封装了线程和首个任务,是线程池工作的基本执行单元。
  • 线程生命周期:由 getTask() 的返回值控制,返回 null 则线程退出循环并结束。

通过 Worker + runWorker() + getTask() 这套精妙的设计,ThreadPoolExecutor 有效避免了频繁创建与销毁线程的系统开销,极大地提升了并发性能。深入理解这一机制,不仅能帮助我们在计算机基础层面理解并发模型,更能让我们在开发中更好地使用线程池,并在遇到相关性能问题或异常时,能够快速、准确地定位根源。




上一篇:ThreadLocal源码解析与内存泄漏防护:Java线程隔离实现深度剖析
下一篇:AI无人机系统开发中,产品经理与需求的10个爆笑对话实录
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-11 05:35 , Processed in 0.796811 second(s), 41 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表