找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

683

积分

0

好友

91

主题
发表于 前天 03:43 | 查看: 7| 回复: 0

在Java中,每一个对象都内置了一套监视器方法,如wait()notify()notifyAll(),它们与synchronized关键字配合使用,可以实现基础的线程间等待与通知机制。而Condition接口位于java.util.concurrent.locks包中,它提供了一套能力更强大的线程协作工具。与Lock接口配合使用,Condition不仅能实现Object.wait()/notify()的功能,还支持多个独立的等待条件,使得在复杂的并发编程场景下,线程的调度与管理更加精细和灵活。

Condition 使用介绍

Condition对象需要通过Lock对象来创建(调用Lock.newCondition()方法)。调用Conditionawait()signal()方法前,必须已经持有与之关联的锁。当线程调用await()方法后,它会释放锁并进入等待状态;直到其他线程调用signal()方法进行通知,并且该等待线程成功重新竞争到锁之后,才会从await()方法返回。

下面我们以经典的生产者-消费者模型为例,展示如何使用ConditionReentrantLock来实现一个线程安全的缓冲区。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerExample {
    private final Lock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition(); // 缓冲区非空条件
    private final Condition notFull = lock.newCondition();  // 缓冲区未满条件
    private final int[] buffer = new int[5];
    private int count = 0; // 当前元素数量
    private int putIndex = 0;
    private int takeIndex = 0;

    // 生产者方法
    public void produce(int item) throws InterruptedException {
        lock.lock();
        try {
            // 使用 while 而不是 if 判断条件,防止虚假唤醒
            while (count == buffer.length) {
                // 缓冲区满,等待 notFull 条件
                // await() 会自动释放锁,并在被唤醒后重新获取锁
                notFull.await();
            }
            buffer[putIndex] = item;
            putIndex = (putIndex + 1) % buffer.length;
            count++;
            System.out.println("Produced: " + item + ", count=" + count);
            // 通知消费者:缓冲区非空
            // signal() 只唤醒一个等待线程,signalAll() 则唤醒所有
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 消费者方法
    public int consume() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                // 缓冲区空,等待 notEmpty 条件
                notEmpty.await();
            }
            int item = buffer[takeIndex];
            takeIndex = (takeIndex + 1) % buffer.length;
            count--;
            System.out.println("Consumed: " + item + ", count=" + count);
            // 通知生产者:缓冲区未满
            notFull.signal();
            return item;
        } finally {
            // 在finally块中释放锁,确保异常情况下也能解锁
            lock.unlock();
        }
    }

    // 测试
    public static void main(String[] args) {
        ProducerConsumerExample pc = new ProducerConsumerExample();
        Thread producer = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                try {
                    pc.produce(i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        Thread consumer = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                try {
                    pc.consume();
                    Thread.sleep(150);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        producer.start();
        consumer.start();
    }
}

在这个例子中,我们创建了两个Condition对象:notFullnotEmpty。生产者线程在缓冲区满时等待notFull条件,消费者线程在缓冲区空时等待notEmpty条件。这种设计比单个等待队列(如使用Object.wait())的逻辑更清晰,也更高效,避免了不必要的唤醒。实际上,JUC中的BlockingQueue正是基于类似的原理实现的。

Condition 原理

在JUC(java.util.concurrent)框架中,Condition并非独立存在,它与Lock接口及AQS(AbstractQueuedSynchronizer)框架深度集成。Condition接口的核心实现是ConditionObject,它是AQS的一个内部类。

ConditionObject类图

每个ConditionObject实例都维护着一个单向的FIFO等待队列。队列中的节点类型与AQS同步队列的节点类型相同,都是AbstractQueuedSynchronizer.Node。当一个线程调用Condition.await()时,它就会被封装成节点加入这个等待队列。

Condition等待队列

这里有一个重要的概念区别:使用内置synchronized锁时,一个对象关联一个同步队列和一个等待队列。而基于AQS的Lock(如ReentrantLock)则拥有一个同步队列多个Condition创建的等待队列

AQS同步队列与Condition等待队列

await() 等待过程

当一个已经持有锁的线程调用Condition.await()时,会发生以下事情:

  1. 构造新节点,将其加入当前Condition等待队列的尾部。
  2. 完全释放当前线程持有的锁(包括重入次数),并唤醒AQS同步队列中的后继节点。
  3. 当前线程进入阻塞状态,等待被唤醒。

从队列视角看,await()操作相当于将AQS同步队列的首节点(即持有锁的节点)移动到Condition的等待队列尾部。

await()方法队列操作示意图

await() 核心源码逻辑(JDK 8):

public final void await() throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    // 1. 将当前线程加入Condition等待队列
    Node node = addConditionWaiter();
    // 2. 完全释放锁,并保存锁状态(用于后续重入)
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    // 3. 循环检查是否已被转移到同步队列,若未转移则阻塞
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 4. 在同步队列中重新竞争获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清理取消的节点,防止内存泄漏
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

signal() 通知过程

调用signal()的线程必须已经持有与Condition关联的锁。该方法会执行以下操作:

  1. 获取Condition等待队列中的第一个(等待最久的)节点。
  2. 将该节点从等待队列移动到AQS的同步队列尾部。
  3. 调用LockSupport.unpark()唤醒该节点对应的线程。

被唤醒的线程会从await()方法内的LockSupport.park()处返回,然后进入while (!isOnSyncQueue(node))循环,此时因为节点已在同步队列中,循环条件不成立,跳出循环。接着,它会调用acquireQueued()方法在同步队列中尝试重新竞争锁。只有成功获取到锁之后,才会从await()方法正式返回。

signalAll()方法则是对等待队列中的每一个节点都执行一次signal()操作,将它们全部移动到同步队列中并唤醒。

从队列视角看,signal()相当于从Condition等待队列取出头节点,重新加入AQS同步队列参与锁竞争。

signal()方法队列操作示意图

signal() 核心源码逻辑(JDK 8):

public final void signal() {
    // 检查调用者是否持有独占锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // 将头节点从等待队列中移除
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        // 尝试将节点转移到同步队列,成功则结束
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

Object Monitor 与 Condition 对比

下表详细对比了两种线程协作机制的主要差异:

对比维度 Object 监视器(synchronized + wait/notify) Condition(ReentrantLock + Condition)
所属机制 JVM内置原生支持 JUC包提供的显式Lock接口机制
锁获取方式 隐式,通过synchronized自动管理 显式,需手动调用lock()/unlock()
条件队列数量 每个对象仅有1个等待队列 每个Lock可创建多个独立Condition等待队列
条件等待方法 wait(), wait(long), wait(long, int) await(), awaitNanos(), awaitUntil(), awaitUninterruptibly()
唤醒方法 notify(), notifyAll() signal(), signalAll()
中断响应 wait()可被中断并抛异常 提供可中断(await())、不可中断(awaitUninterruptibly())等多种选择
锁释放行为 调用wait()时释放锁,唤醒后自动重入 调用await()时完全释放锁(含重入次数),返回前需重新竞争
使用前提 必须在synchronized同步块内调用 必须在持有对应Lock锁的情况下调用
公平性支持 不支持 ReentrantLock可配置为公平锁
典型应用场景 简单的线程间协作 复杂的多条件协作(如线程池、阻塞队列等)



上一篇:PostgreSQL JSONB实战避坑:Go服务性能与查询优化指南
下一篇:YOLOv5网络架构深度解析:以v6.0 P5模型为例
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2025-12-11 03:43 , Processed in 0.081581 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表