在Java中,每一个对象都内置了一套监视器方法,如wait()、notify()和notifyAll(),它们与synchronized关键字配合使用,可以实现基础的线程间等待与通知机制。而Condition接口位于java.util.concurrent.locks包中,它提供了一套能力更强大的线程协作工具。与Lock接口配合使用,Condition不仅能实现Object.wait()/notify()的功能,还支持多个独立的等待条件,使得在复杂的并发编程场景下,线程的调度与管理更加精细和灵活。
Condition 使用介绍
Condition对象需要通过Lock对象来创建(调用Lock.newCondition()方法)。调用Condition的await()或signal()方法前,必须已经持有与之关联的锁。当线程调用await()方法后,它会释放锁并进入等待状态;直到其他线程调用signal()方法进行通知,并且该等待线程成功重新竞争到锁之后,才会从await()方法返回。
下面我们以经典的生产者-消费者模型为例,展示如何使用Condition和ReentrantLock来实现一个线程安全的缓冲区。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerExample {
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition(); // 缓冲区非空条件
private final Condition notFull = lock.newCondition(); // 缓冲区未满条件
private final int[] buffer = new int[5];
private int count = 0; // 当前元素数量
private int putIndex = 0;
private int takeIndex = 0;
// 生产者方法
public void produce(int item) throws InterruptedException {
lock.lock();
try {
// 使用 while 而不是 if 判断条件,防止虚假唤醒
while (count == buffer.length) {
// 缓冲区满,等待 notFull 条件
// await() 会自动释放锁,并在被唤醒后重新获取锁
notFull.await();
}
buffer[putIndex] = item;
putIndex = (putIndex + 1) % buffer.length;
count++;
System.out.println("Produced: " + item + ", count=" + count);
// 通知消费者:缓冲区非空
// signal() 只唤醒一个等待线程,signalAll() 则唤醒所有
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 消费者方法
public int consume() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
// 缓冲区空,等待 notEmpty 条件
notEmpty.await();
}
int item = buffer[takeIndex];
takeIndex = (takeIndex + 1) % buffer.length;
count--;
System.out.println("Consumed: " + item + ", count=" + count);
// 通知生产者:缓冲区未满
notFull.signal();
return item;
} finally {
// 在finally块中释放锁,确保异常情况下也能解锁
lock.unlock();
}
}
// 测试
public static void main(String[] args) {
ProducerConsumerExample pc = new ProducerConsumerExample();
Thread producer = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
pc.produce(i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread consumer = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
pc.consume();
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
producer.start();
consumer.start();
}
}
在这个例子中,我们创建了两个Condition对象:notFull和notEmpty。生产者线程在缓冲区满时等待notFull条件,消费者线程在缓冲区空时等待notEmpty条件。这种设计比单个等待队列(如使用Object.wait())的逻辑更清晰,也更高效,避免了不必要的唤醒。实际上,JUC中的BlockingQueue正是基于类似的原理实现的。
Condition 原理
在JUC(java.util.concurrent)框架中,Condition并非独立存在,它与Lock接口及AQS(AbstractQueuedSynchronizer)框架深度集成。Condition接口的核心实现是ConditionObject,它是AQS的一个内部类。

每个ConditionObject实例都维护着一个单向的FIFO等待队列。队列中的节点类型与AQS同步队列的节点类型相同,都是AbstractQueuedSynchronizer.Node。当一个线程调用Condition.await()时,它就会被封装成节点加入这个等待队列。

这里有一个重要的概念区别:使用内置synchronized锁时,一个对象关联一个同步队列和一个等待队列。而基于AQS的Lock(如ReentrantLock)则拥有一个同步队列和多个由Condition创建的等待队列。

await() 等待过程
当一个已经持有锁的线程调用Condition.await()时,会发生以下事情:
- 构造新节点,将其加入当前
Condition等待队列的尾部。
- 完全释放当前线程持有的锁(包括重入次数),并唤醒AQS同步队列中的后继节点。
- 当前线程进入阻塞状态,等待被唤醒。
从队列视角看,await()操作相当于将AQS同步队列的首节点(即持有锁的节点)移动到Condition的等待队列尾部。

await() 核心源码逻辑(JDK 8):
public final void await() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// 1. 将当前线程加入Condition等待队列
Node node = addConditionWaiter();
// 2. 完全释放锁,并保存锁状态(用于后续重入)
long savedState = fullyRelease(node);
int interruptMode = 0;
// 3. 循环检查是否已被转移到同步队列,若未转移则阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 4. 在同步队列中重新竞争获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理取消的节点,防止内存泄漏
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal() 通知过程
调用signal()的线程必须已经持有与Condition关联的锁。该方法会执行以下操作:
- 获取
Condition等待队列中的第一个(等待最久的)节点。
- 将该节点从等待队列移动到AQS的同步队列尾部。
- 调用
LockSupport.unpark()唤醒该节点对应的线程。
被唤醒的线程会从await()方法内的LockSupport.park()处返回,然后进入while (!isOnSyncQueue(node))循环,此时因为节点已在同步队列中,循环条件不成立,跳出循环。接着,它会调用acquireQueued()方法在同步队列中尝试重新竞争锁。只有成功获取到锁之后,才会从await()方法正式返回。
signalAll()方法则是对等待队列中的每一个节点都执行一次signal()操作,将它们全部移动到同步队列中并唤醒。
从队列视角看,signal()相当于从Condition等待队列取出头节点,重新加入AQS同步队列参与锁竞争。

signal() 核心源码逻辑(JDK 8):
public final void signal() {
// 检查调用者是否持有独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 将头节点从等待队列中移除
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 尝试将节点转移到同步队列,成功则结束
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
Object Monitor 与 Condition 对比
下表详细对比了两种线程协作机制的主要差异:
| 对比维度 |
Object 监视器(synchronized + wait/notify) |
Condition(ReentrantLock + Condition) |
| 所属机制 |
JVM内置原生支持 |
JUC包提供的显式Lock接口机制 |
| 锁获取方式 |
隐式,通过synchronized自动管理 |
显式,需手动调用lock()/unlock() |
| 条件队列数量 |
每个对象仅有1个等待队列 |
每个Lock可创建多个独立的Condition等待队列 |
| 条件等待方法 |
wait(), wait(long), wait(long, int) |
await(), awaitNanos(), awaitUntil(), awaitUninterruptibly()等 |
| 唤醒方法 |
notify(), notifyAll() |
signal(), signalAll() |
| 中断响应 |
wait()可被中断并抛异常 |
提供可中断(await())、不可中断(awaitUninterruptibly())等多种选择 |
| 锁释放行为 |
调用wait()时释放锁,唤醒后自动重入 |
调用await()时完全释放锁(含重入次数),返回前需重新竞争 |
| 使用前提 |
必须在synchronized同步块内调用 |
必须在持有对应Lock锁的情况下调用 |
| 公平性支持 |
不支持 |
ReentrantLock可配置为公平锁 |
| 典型应用场景 |
简单的线程间协作 |
复杂的多条件协作(如线程池、阻塞队列等) |