在高并发与多线程应用中,协调不同线程之间的执行顺序和资源共享至关重要。java.util.concurrent(JUC)包提供了一系列强大的线程协作工具,它们基于底层并发框架构建,极大地简化了并发编程模型。本文将深入解析CountDownLatch、CyclicBarrier、Phaser、Semaphore和Exchanger这五大核心工具类的原理、使用场景及最佳实践。
CountDownLatch(JDK 5+):倒计时门闩
CountDownLatch 用于实现“一个或多个线程等待其他一组线程完成操作”的场景。它通过一个计数器进行同步,初始化时设定计数值,线程调用countDown()使计数减一,而调用await()的线程会阻塞直到计数器归零。
核心 API 与使用示例
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 正在执行任务...");
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 任务完成,计数减一
}
}).start();
}
latch.await(); // 主线程等待所有任务完成
System.out.println("所有任务已完成,主线程继续执行!");
}
}
典型应用场景
- 服务启动检查:主线程等待数据库、缓存等核心组件全部初始化完成后,再启动对外服务。
- 并行任务汇总:多个子任务并发执行,主任务需等待所有子任务完成后才能进行结果合并。
注意事项与原理
- 不可重用:计数器归零后,该
CountDownLatch实例的使命就结束了,如需重用,请考虑CyclicBarrier。
- 防死锁:确保所有线程最终都会调用
countDown(),通常应将其置于finally块中。
- 可中断:
await()方法会响应中断。
- 实现原理:其核心基于AQS(AbstractQueuedSynchronizer)框架。内部状态
state代表剩余计数值。await()对应获取共享锁,countDown()对应释放共享锁。想要深入理解Java并发锁机制,可以参考网络与系统底层原理相关的知识。
CyclicBarrier(JDK 5+):循环屏障
CyclicBarrier 让一组线程互相等待,直到所有线程都到达一个公共屏障点后,才能继续执行。它支持一个可选的“屏障动作”(Runnable),在所有线程到达后执行,并且可以重置后重复使用。
核心 API 与使用示例
public class CyclicBarrierExample {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有线程已到达屏障,执行汇总操作!");
});
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 第一阶段完成");
barrier.await(); // 等待其他线程
System.out.println(Thread.currentThread().getName() + " 第二阶段开始");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
典型应用场景
- 多阶段计算:如并行计算中,每个线程完成一个阶段计算后,在屏障点同步并交换数据,然后进入下一阶段。
- 模拟并发:在压力测试中,使用它让所有请求线程准备就绪后同时发起。
注意事项与原理
- 可重用:当所有等待线程被释放后,屏障会自动重置,可迎接下一轮同步。
- 屏障损坏:若等待线程被中断、超时或
reset()被调用,屏障会进入“损坏”状态,后续调用await()将抛出BrokenBarrierException。
- 实现原理:其内部使用
ReentrantLock和Condition实现,而非AQS。线程调用await()时在Condition上等待,最后一个线程到达后执行屏障动作并唤醒所有等待线程。
CyclicBarrier 与 CountDownLatch 对比
| 特性 |
CountDownLatch |
CyclicBarrier |
| 计数方向 |
减到0 |
加到指定值 |
| 是否可重用 |
否 |
是 |
| 主要参与者 |
两类角色(等待线程、计数线程) |
一类角色(所有线程相互等待) |
| 核心方法 |
await(), countDown() |
await() |
Phaser(JDK 7+):灵活的阶段同步器
Phaser 是功能更为强大的同步器,可以看作是CyclicBarrier的增强版。它支持动态注册/注销参与者、多阶段(Phase)同步以及分层(Tiering) 结构,适用于复杂的多阶段任务协调。
核心概念与使用示例
public class PhaserExample {
public static void main(String[] args) {
Phaser phaser = new Phaser(1); // 主线程注册自己,用于控制阶段
for (int i = 0; i < 3; i++) {
phaser.register(); // 为每个工作线程注册
new Thread(() -> {
for (int phase = 0; phase < 3; phase++) {
System.out.println(Thread.currentThread().getName() + " 完成阶段 " + phase);
// 到达并等待本阶段其他参与者
phaser.arriveAndAwaitAdvance();
}
// 任务结束,注销自己
phaser.arriveAndDeregister();
}).start();
}
// 主线程解除注册,并等待所有阶段完成
phaser.arriveAndDeregister();
while (!phaser.isTerminated()) {
Thread.yield();
}
System.out.println("所有阶段已完成!");
}
}
高级特性:分层结构
当参与者数量巨大时,单一的Phaser可能成为性能瓶颈。Phaser支持构建树状父子结构,子Phaser向父Phaser注册,组内线程先在子Phaser同步,再由子Phaser的代表与父Phaser同步,从而减少锁竞争。
Phaser 与 CyclicBarrier 对比
| 特性 |
CyclicBarrier |
Phaser |
| 动态调整 |
参与者数量固定 |
支持register(), arriveAndDeregister() |
| 阶段控制 |
隐式循环 |
显式阶段号(Phase),可获取当前阶段 |
| 分层支持 |
不支持 |
支持父子结构,适合大规模并发 |
| 终止机制 |
损坏后不可用 |
参与者数为0时自动终止,可查询状态 |
Semaphore(JDK 5+):信号量
Semaphore 用于控制同时访问特定资源的线程数量,它通过维护一组“许可”(permits)来实现流量控制或资源池化。
核心 API 与使用示例
public class SemaphoreExample {
private static final Semaphore semaphore = new Semaphore(2); // 最多允许2个线程并发访问
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println(Thread.currentThread().getName() + " 获取许可,正在执行...");
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
System.out.println(Thread.currentThread().getName() + " 释放许可");
}
}).start();
}
}
}
典型应用场景
- 资源池管理:如数据库连接池,限制最大连接数。
- 流量控制:对高并发接口进行限流,例如每秒最多处理N个请求。
- 互斥锁:当许可数为1时,
Semaphore可退化为一个互斥锁。
注意事项与原理
- 公平性:构造函数可指定是否为公平模式,公平模式下按申请顺序获取许可。
- 非阻塞获取:
tryAcquire()方法尝试获取许可,失败立即返回,不会阻塞。
- 实现原理:同样基于AQS框架。
state值表示可用许可数。acquire()是获取共享锁,release()是释放共享锁。
Exchanger(JDK 5+):数据交换器
Exchanger 提供了一个同步点,用于配对的两个线程之间交换数据。每个线程在到达交换点后,调用exchange(V x)方法,将自己的数据传入,并阻塞直到配对线程也调用了此方法,然后接收对方的数据并返回。
使用示例
public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "Thread-A 的数据";
System.out.println("A 准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("A 收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
try {
Thread.sleep(1000);
String data = "Thread-B 的数据";
System.out.println("B 准备交换: " + data);
String received = exchanger.exchange(data);
System.out.println("B 收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
典型应用场景
- 双缓冲技术:一个线程填充缓冲区A,另一个线程填充缓冲区B,填充完成后两者交换缓冲区,实现高效的读写分离。
- 遗传算法:配对线程交换染色体数据进行交叉操作。
注意事项
- 仅支持两两配对,如果三个线程调用
exchange(),后到的线程将因为没有配对线程而一直等待。
- 其底层实现采用了无锁的
CAS操作和slot(槽位)机制,以实现高效的数据交换。
总结与选型建议
| 工具类 |
核心目的 |
是否可重用 |
适用场景 |
| CountDownLatch |
一等多 |
否 |
启动准备、并行任务汇总 |
| CyclicBarrier |
多等多 |
是 |
分阶段并行计算、多线程测试 |
| Phaser |
动态多阶段同步 |
是 |
复杂的、参与者可变的多阶段任务(如工作流) |
| Semaphore |
控制并发数 |
是 |
限流、资源池(如连接池) |
| Exchanger |
两线程交换数据 |
是 |
双缓冲、配对线程数据交换 |
在选择时,应首先明确同步需求:是等待完成(CountDownLatch)、阶段同步(CyclicBarrier/Phaser)、控制并发(Semaphore)还是数据交换(Exchanger)。对于复杂的、参与者动态变化的多阶段任务,Phaser是最佳选择。这些工具都是构建健壮、高效的Java高并发应用的基础构件,深入理解其原理和使用场景,能帮助开发者更好地驾驭并发编程。