在上一篇探讨了ConcurrentHashMap的并发设计之后,我们转向多线程开发的另一个核心议题——线程同步控制。如何精确协调多个线程的执行顺序,确保它们在特定节点等待或同时推进,是高并发编程的关键挑战。
Java的JUC包为此提供了两个强大的工具:CountDownLatch(倒计时门闩)和CyclicBarrier(循环屏障)。它们都用于多线程同步,但设计理念和适用场景却有本质区别。本文将深入二者的核心差异,剖析其JUC源码实现,并提供清晰的实战选型与避坑指南。
一、核心差异:先理清本质,避免混淆
虽然都能实现“等待”,但CountDownLatch和CyclicBarrier从设计初衷到使用方式都不同。下表清晰地展示了它们的核心区别:
| 对比维度 |
CountDownLatch(倒计时门闩) |
CyclicBarrier(循环屏障) |
| 核心设计初衷 |
让一个/多个线程等待其他N个线程完成操作后,再继续执行 |
让N个线程互相等待,直到所有线程都到达指定“屏障点”,再同时继续执行 |
| 核心特性 |
一次性计数器,计数归0后失效,无法重置 |
可循环屏障,所有线程通过屏障后,计数器自动重置,可重复使用 |
| 底层实现 |
基于AQS共享模式,重写 tryAcquireShared/tryReleaseShared |
基于ReentrantLock + Condition实现(底层仍依赖AQS),无直接重写AQS方法 |
| 等待主体 |
等待方和被等待方分离(等待方是主线程/其他线程,被等待方是执行任务的线程) |
等待方和被等待方合一(所有线程都是等待方,也都是被等待方) |
| 计数触发方式 |
由被等待线程调用countDown()触发计数递减,可任意线程调用 |
由每个线程自己调用await()触发,只有线程到达屏障点才会调用 |
| 异常处理 |
无内置异常处理,子线程异常不会影响计数器,可能导致等待方永久阻塞 |
支持屏障任务,且有内置异常处理,线程中断/异常会触发“屏障破裂” |
| 核心方法 |
countDown()(递减计数)、await()(等待计数归0) |
await()(等待屏障)、reset()(手动重置屏障) |
| 核心状态 |
AQS状态值state = 初始计数N,countDown()让state--,直到state=0 |
内置计数器count = 初始参与线程数N,每个线程await()则count--,直到count=0 |
关键理解:二者的核心差异源于“等待的主体关系”。CountDownLatch是单向等待,一方被动等待另一方完成,任务完成后无需再次同步,因此设计为一次性。CyclicBarrier是互相等待,所有线程主动汇集到同一个点,这种多轮同步的需求自然催生了可循环的特性。
二、前置知识:AQS共享模式要点速览
为了更好理解源码,先快速回顾AQS(AbstractQueuedSynchronizer)共享模式的三个核心点,它们是CountDownLatch的基石:
- AQS核心状态:
volatile int state,所有同步逻辑围绕对state的原子修改展开。
- 共享模式核心:多个线程可同时获取共享锁。当条件满足时,所有等待的线程会被依次唤醒,适用于多线程等待同一条件的场景。
- 核心方法:共享模式下需重写
tryAcquireShared(int arg)(尝试获取锁)和tryReleaseShared(int arg)(尝试释放锁),通过CAS保证对state操作的线程安全。
简单说,CountDownLatch直接将AQS的state用作自己的计数器,其核心逻辑就是对这个state的CAS修改以及共享模式下的等待/唤醒。
三、CountDownLatch源码解析:基于AQS共享模式的一次性门闩
CountDownLatch的设计极其简洁,其所有功能都通过一个继承自AQS的内部类Sync实现,state即计数器。
1. 核心结构:极简的AQS封装
public class CountDownLatch {
// 核心内部类:继承AQS,重写共享模式方法
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count); // 初始化AQS的state为倒计时计数N
}
int getCount() {
return getState();
}
// 尝试获取共享锁:state为0时才能获取成功(等待线程可继续)
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 尝试释放共享锁:CAS将state减1,仅当state变为0时才返回true触发唤醒
protected boolean tryReleaseShared(int releases) {
for (;;) { // 自旋CAS
int c = getState();
if (c == 0) // state已为0,无需释放
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc)) // CAS修改state为c-1
return nextc == 0; // 只有state变为0时,才返回true
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
}
结构总结:
CountDownLatch是AQS共享模式的极简封装,直接将state作为计数器。
- 核心是重写
tryAcquireShared(判断计数是否归零)和tryReleaseShared(实现CAS递减计数)。
2. 核心操作:countDown()与await()
所有操作都委托给内部的sync对象。
-
countDown():递减计数器
public void countDown() {
sync.releaseShared(1); // 委托给AQS,最终调用tryReleaseShared
}
核心流程:调用countDown() -> AQS执行releaseShared(1) -> 调用重写的tryReleaseShared自旋CAS将state减1 -> 若state减为0,则返回true,AQS唤醒同步队列中所有等待的线程。
-
await():等待计数归零
// 无超时等待(可中断)
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 有超时等待
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
核心流程:调用await() -> AQS执行acquireSharedInterruptibly(1) -> 调用重写的tryAcquireShared判断state是否为0:
- 若
state=0,返回1,获取锁成功,线程继续执行。
- 若
state≠0,返回-1,获取失败,线程被封装为节点加入AQS同步队列并阻塞,直到被其他线程countDown()导致state=0时唤醒。
设计感悟:CountDownLatch完美体现了“基于基础组件做极简封装”的思想。它几乎没有新增逻辑,只是根据“倒计时等待”的需求,重写了AQS共享模式的两个钩子方法,就构建出一个强大而可靠的同步工具。
四、CyclicBarrier源码解析:基于ReentrantLock+Condition的循环屏障
CyclicBarrier没有直接继承AQS,而是基于ReentrantLock和Condition实现,核心在于其内置的计数器以及支持循环的“代次”(Generation)设计。
1. 核心结构:锁、条件队列与代次
public class CyclicBarrier {
// 代次类,支持循环的关键
private static class Generation {
boolean broken = false; // 屏障是否破裂
}
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition(); // 条件队列,用于线程等待
private final int parties; // 参与屏障的固定线程数
private final Runnable barrierCommand; // 屏障任务(可选)
private Generation generation = new Generation(); // 当前屏障代次
private int count; // 动态计数器,初始值=parties
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
}
结构总结:
parties(固定参与者数量)和count(动态计数器)是屏障逻辑的核心。
ReentrantLock保证对count修改的原子性,Condition(trip)实现线程的条件等待。
Generation代次与broken标记是实现循环和异常处理的关键。
2. 核心操作:await()方法的全流程
CyclicBarrier的核心是await()方法,它集等待、计数、触发屏障、重置于一身。
private int dowait(boolean timed, long nanos) throws ... {
final ReentrantLock lock = this.lock;
lock.lock(); // 1. 获取锁,保证后续操作的原子性
try {
final Generation g = generation;
if (g.broken) // 屏障已破裂,抛异常
throw new BrokenBarrierException();
if (Thread.interrupted()) { // 当前线程被中断,破裂屏障
breakBarrier();
throw new InterruptedException();
}
int index = --count; // 2. 计数器减1,index是当前线程的“到达序号”
if (index == 0) { // 3. 最后一个线程到达,触发屏障
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 执行屏障任务
ranAction = true;
nextGeneration(); // 关键:重置屏障(count=parties,唤醒所有线程),实现循环
return 0;
} finally {
if (!ranAction) // 屏障任务执行异常,破裂屏障
breakBarrier();
}
}
// 4. 非最后一个线程,进入条件队列等待
for (;;) {
try {
if (!timed)
trip.await(); // 无限期等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos); // 超时等待
} catch (InterruptedException ie) {
// ... 中断处理,破裂屏障
}
// 被唤醒后,检查屏障状态
if (g.broken)
throw new BrokenBarrierException();
if (g != generation) // 屏障已换代(已通过),返回自己的到达序号
return index;
if (timed && nanos <= 0L) { // 超时处理,破裂屏障
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
核心流程总结:
- 加锁与校验:获取锁,检查屏障状态和线程中断状态。
- 计数递减:
count--,得到当前线程的到达序号index。
- 判断触发:
- 若
index == 0(最后一个线程):执行屏障任务(如有),调用nextGeneration()重置计数器、唤醒所有等待线程、创建新代次,实现循环。
- 若
index != 0:进入Condition条件队列等待,并释放锁。
- 等待与唤醒:线程被唤醒后,检查屏障状态。若屏障已通过(代次已变),则返回;若屏障破裂或超时,则抛异常。
3. 关键辅助方法
nextGeneration():循环的核心。唤醒所有等待线程,重置count = parties,创建新的Generation。
breakBarrier():异常处理的核心。设置broken = true,重置count,唤醒所有等待线程,使后续线程调用await()时抛出BrokenBarrierException,避免永久阻塞。
设计感悟:CyclicBarrier展现了“基于基础锁机制做上层封装”的思路。它利用成熟的ReentrantLock和Condition,通过“代次”设计优雅地实现了循环同步,并通过“破裂”标记提供了内置的异常处理机制,使得API更健壮,更贴合“多线程互相等待”的复杂场景。
五、从源码解析看特性与实现的关联
理解源码后,我们再回头看它们的特性差异,就会发现一切都有迹可循:
| 核心特性 |
底层实现原因 |
| CountDownLatch一次性 |
直接使用AQS的state作计数器,state=0后,AQS共享模式没有重置state的机制。 |
| CyclicBarrier可循环 |
使用独立的count计数器,并通过Generation代次设计,在nextGeneration()中主动重置count=parties。 |
| CountDownLatch等待方与被等待方分离 |
countDown()和await()可由不同线程随意调用,二者在AQS层面无绑定关系。 |
| CyclicBarrier线程互相等待 |
计数递减(--count)内嵌在await()方法中,每个参与者必须调用await()才能推进流程。 |
| CyclicBarrier内置异常处理 |
通过Generation.broken标记和breakBarrier()方法,在中断或超时后能主动破裂屏障,唤醒所有线程并抛异常。 |
六、实战指引:精准选型与避坑实践
1. 精准选型:一句话原则
- 用CountDownLatch:当一个或多个线程需要等待其他线程完成某项一次性的任务时。例如:主线程等待所有数据加载线程完毕;服务启动时等待多个组件初始化完成。
- 用CyclicBarrier:当多个线程需要互相等待,到达同一个节点后,再一起向下执行,且可能重复多轮时。例如:多线程分阶段处理数据,每阶段需同步;模拟并发测试,等待所有线程就绪后同时发起请求。
2. 避坑技巧与最佳实践
CountDownLatch高频坑点:
- 计数不匹配:初始计数N大于实际调用
countDown()的次数,导致等待线程永久阻塞。
- 解决:确保N等于必须完成的任务数,并在每个任务逻辑的
finally块中调用countDown()。
- 无限期等待:使用了无超时的
await(),因任务异常导致永久阻塞。
- 解决:优先使用带超时的
await(long timeout, TimeUnit unit),并设置合理的超时时间进行降级处理。
- 误重复使用:计数归零后,该
CountDownLatch实例失效,再次调用await()会立即返回。
- 解决:明确其一次性,需要多次同步时,应创建新实例或改用
CyclicBarrier。
CyclicBarrier高频坑点:
- 参与者不匹配:构造时指定的
parties数多于实际调用await()的线程数。
- 解决:确保参与线程数等于
parties,并在finally块中调用await()。
- 忽略屏障破裂异常:线程中断或超时导致屏障破裂,若不处理
BrokenBarrierException,程序逻辑会出错。
- 解决:调用
await()时必须捕获BrokenBarrierException,根据业务决定是重置屏障(reset())还是终止流程。
- 屏障任务过重:屏障任务由最后一个到达的线程执行,若耗时过长会阻塞其他已被唤醒的线程。
- 解决:屏障任务应保持轻量,复杂操作可提交给线程池执行。
通用最佳实践:
- 资源释放:将
countDown()和await()调用置于finally块中,确保执行。
- 超时保护:总是倾向于使用带超时参数的方法,增强系统健壮性。
- AQS结合线程池:与
CountDownLatch配合时,使用线程池管理子任务,便于资源管理和异常收集。
- 谨慎重置:
CyclicBarrier.reset()方法会强制破裂当前屏障,应仅在明确需要重新同步时使用。
掌握CountDownLatch和CyclicBarrier的底层原理与适用场景,能帮助我们在面对复杂的多线程同步问题时,做出准确、高效的技术选型,并写出更健壮的并发代码。