找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2442

积分

0

好友

343

主题
发表于 5 小时前 | 查看: 1| 回复: 0

场景引入

正如标题所言,ArrayBlockingQueue是一个阻塞队列的数组实现。如果要生动地描述阻塞队列的应用场景,餐厅取餐、出餐的场景非常合适。

试想在学校的窗口取餐,一个窗口是预制好的烤肉拌饭,另一个窗口则是现炒的小炒菜,要如何选择?

先把目光移向烤肉拌饭。因为出餐太快,预制好的菜已经放满了窗台前,出餐阿姨只能在窗口闲等,等哪个着急的学生过来拿走一份饭,她才能继续向上摆。

再看向小炒菜。由于味道美味,很多学生都想吃,但炒菜的出餐量有限,学生没法立即享受到美味,来晚的只能排队等待。哪怕学生再想吃,窗台前都是空的,没得拿,只有出餐了才能拿。

分割线

这就是阻塞队列的典型场景。烤肉拌饭的窗台口已经摆满饭了,阿姨只能阻塞等待有空了再放餐(对应put操作)。小炒菜的窗口太火爆了,学生要想吃只能阻塞等待,等轮到自己了再取餐(对应take操作)。

API介绍

放餐操作:add、offer、put

  • add: 如果队列容量未满,则插入指定元素;成功时返回 true,若当前无空位则抛出 IllegalStateException
  • offer: 如果队列容量未满,则插入指定元素;成功时返回 true,若当前无空位则返回 false
  • put: 如果队列容量未满,则插入指定元素;如果满了,则阻塞等待有空位。如果等待时被打断,则抛异常。
  • offer(long timeout, TimeUnit unit): 超时等待的put()。如果等到空位返回true,如果超过timeout还是没有空位,返回false

取餐操作:take、poll

  • poll: 如果队列容量不为空,则取出队列头部并返回。如果为空返回null
  • take: 如果队列容量不为空,则取出队列头部并返回。如果为空则阻塞等待。
  • poll(long timeout, TimeUnit unit): 超时等待的take(),如果队列容量不为空,则取出队列头部并返回。如果为空则阻塞等待指定时间。如果超过时间还为空,则返回null

源码分析

ArrayBlockingQueue的精髓在于对阻塞等待的处理。本文将详细分析阻塞takeput相关的源码,理解“阻塞”的核心实现。

在此之前,我们需要先理解它的底层结构。从名字来看,这个阻塞队列是以Array为基础的,因此“有界”就很好理解。它使用数组作为底层存储,而数组的分配必须是连续的内存空间,这段连续空间的大小就是阻塞队列的容量。

ArrayBlockingQueue的items数组定义代码

同时,开篇也提到了,这样的阻塞队列必须是线程安全的。它底层使用了 ReentrantLock 来保证线程的安全。

ArrayBlockingQueue的锁与条件变量定义代码

为什么还需要Condition呢?等我们看下面的源码就能明白了。这正是Java并发包(JUC)中实现线程间协调通信的关键机制。

enqueue、dequeue:入队出队的底层

//入队操作
private void enqueue(E e) {
    final Object[] items = this.items;
    items[putIndex] = e;
    if (++putIndex == items.length)
        putIndex = 0; //如果当前元素入队后到达数组末尾,重置索引,下一次再从数组头部开始(循环队列)
    count++;
    notEmpty.signal(); //如果元素加入到空队列中,则唤醒正在等待获取元素的线程
}

这里有一个细节:为什么不直接操作this.items,反而要先定义一个局部变量 final Object[] items = this.items 来接收它?

一、避免多次访问堆内存,提升执行效率

首先要明确 Java 的内存访问特性:

  • this.items 是类的成员变量,存储在堆内存中。每次访问 this.items 都需要通过 this 引用(堆地址)去寻址,相对耗时。
  • 局部变量 items 存储在栈内存中。栈内存的访问速度远快于堆内存,而且局部变量的寻址是直接的,无需额外间接引用。

enqueue 方法中多次用到了 itemsitems[putIndex] = eputIndex == items.length)。如果直接用 this.items,会多次触发堆内存寻址。而先把 this.items 赋值给局部变量,后续只需要访问栈上的局部变量即可,减少了堆内存访问次数,提升了执行效率。这种优化技巧在追求性能的Java源码中很常见。

二、防止指令重排导致的可见性问题,保证线程安全

ArrayBlockingQueue 是线程安全类,items 是共享成员变量。虽然有 ReentrantLock 保证原子性,但 Java 虚拟机的指令重排可能会导致潜在的可见性问题。

简单说:final 局部变量相当于给 items 数组引用拍了一张 “快照”,确保整个 enqueue 方法执行期间,使用的是同一个数组引用,不会因为 JVM 的优化而读取到错误或过期的数组对象引用,从而进一步增强了线程安全性。

//出队操作
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E e = (E) items[takeIndex]; //取出队列头部元素
    items[takeIndex] = null; //将原位置设为null,帮助GC
    if (++takeIndex == items.length)
        takeIndex = 0; //与入队对应,到达末尾后回到数组头部
    count--;
    if (itrs != null)
        itrs.elementDequeued(); //通知所有活跃迭代器,元素已被移除
    notFull.signal(); //唤醒因队列满而阻塞等待的线程,队列现在有空位了
    return e;
}

itrs是干什么的?

itrsArrayBlockingQueue 用来管理当前所有活跃迭代器(Iterator)的集合。目的是保证迭代器的弱一致性,避免迭代过程中出现异常或错误的元素引用。

当队列执行 dequeue 出队操作(删除队首元素)时,会调用 itrs.elementDequeued()。核心目的是通知所有活跃迭代器:“某个元素被删除了,你们需要更新自己的状态,避免遍历出错”。

void elementDequeued() {
    // assert lock.isHeldByCurrentThread();
    if (count == 0)
        queueIsEmpty();
    else if (takeIndex == 0)
        takeIndexWrapped();
}
  • queueIsEmpty():在本次出队后队列变空时,重置迭代器状态(nextIndex=-1remaining=0),标记迭代器遍历完毕,避免无效遍历。
  • takeIndexWrapped():在本次出队后 takeIndex 循环回绕到数组开头时,修正迭代器的状态,保证迭代器能正确适配队列的循环结构。

通过图可能能够更好理解入队、出队时数组索引的循环变化,这本质是一种循环队列数据结构

队列的enqueue与dequeue操作示意图

put、take:阻塞入队出队的实现

理解了底层的enqueuedequeue,我们来看最核心的阻塞方法puttake是如何利用锁和条件变量实现的。

public void put(E e) throws InterruptedException {
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 获取锁,但允许被中断
    try {
        while (count == items.length)
            notFull.await(); // 如果队列满了,就在notFull条件上阻塞等待
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await(); // 如果队列是空的,就在notEmpty条件上阻塞等待
        return dequeue();
    } finally {
        lock.unlock();
    }
}

可以看到,ArrayBlockingQueue 使用一把 ReentrantLock 保证了入队和出队操作的线程安全。同时,利用两个条件变量(notFullnotEmpty)实现了经典的生产者-消费者模型:当队列满时,生产者线程在 notFull 上等待;当队列空时,消费者线程在 notEmpty 上等待。每当成功入队或出队后,就会通过 signal() 唤醒在对应条件上等待的线程。这正是构建高并发、线程安全组件的典型架构模式。

思考

  1. 可以看到,takeput操作使用的都是同一把ReentrantLock,这说明这两个操作是会互相阻塞的(即入队和出队不能同时进行)。怎么样才能让这两个操作独立起来,实现更高的并发度?

  2. 这个数组阻塞队列是强制有界的,必须在构造时指定容量。如果不确定队列大小的情况下,或者希望队列“无界”,该怎么处理?

这两个问题在 LinkedBlockingQueue 中都给出了不同的答案。它使用了两把锁(“锁分离”技术)来分别保护队列的头部和尾部,从而让入队和出队操作在大部分情况下可以并行,并且可以构造为“理论上无界”的队列。有兴趣的开发者可以自行对比了解。


本文内容首发于 云栈社区,一个专注于开发者成长与交流的技术社区。




上一篇:Oracle OCP认证备考指南:三大高频易错考点详解
下一篇:Java双层for循环性能优化:从O(n²)到O(n)的业务数据处理实践
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-18 16:46 , Processed in 0.223011 second(s), 39 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表