在并发编程的多线程协作场景中,生产者-消费者模型堪称最经典的设计模式之一。该模式的核心在于将数据生成与数据处理解耦:生产者负责生产数据,消费者负责处理数据。
为了安全、高效地在生产者与消费者之间传递数据,Java 并发包提供了 java.util.concurrent.BlockingQueue 接口及其多种实现,即阻塞队列。它不仅极大地简化了线程间的数据通信逻辑,还通过内置的阻塞与唤醒机制,避免了开发者手动使用 wait/notify 的复杂性,是构建健壮并发系统的关键组件。
BlockingQueue 接口详解
BlockingQueue 是一个线程安全的队列接口,继承自 Queue。其最核心的特性是:当队列满时,插入(入队)操作会被阻塞;当队列空时,移除(出队)操作会被阻塞。
为了满足不同场景下的操作需求,该接口定义了几类方法:
核心方法分类
| 操作类型 |
抛异常 |
返回特殊值 |
阻塞 |
超时 |
| 插入(入队) |
add(e) |
offer(e) |
put(e) |
offer(e, timeout, unit) |
| 移除(出队) |
remove() |
poll() |
take() |
poll(timeout, unit) |
| 查看队首元素 |
element() |
peek() |
— |
— |
注意:BlockingQueue 不允许插入 null 元素,尝试插入将抛出 NullPointerException。这通常用于在像 poll 这样的操作中作为“队列为空”的特殊返回值。
典型使用示例:生产者消费者模式
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 生产者线程
new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put("Task-" + i); // 队列满时会自动阻塞等待
System.out.println("Produced: Task-" + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
while (true) {
String task = queue.take(); // 队列空时会自动阻塞等待
System.out.println("Consumed: " + task);
Thread.sleep(100); // 模拟处理耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
此示例清晰地展示了如何使用 BlockingQueue 轻松实现线程间协作,无需复杂的并发控制代码。
BlockingQueue 常用实现类原理分析
Java 提供了多种 BlockingQueue 实现,各有其适用场景。理解其内部原理有助于我们在实际开发中做出最佳选择。
| 实现类 |
是否有界 |
数据结构 |
特性说明 |
ArrayBlockingQueue |
有界 |
数组 |
FIFO,内部使用单锁,可选公平/非公平锁 |
LinkedBlockingQueue |
可选有界 |
链表 |
默认无界(Integer.MAX_VALUE),采用双锁提升并发 |
PriorityBlockingQueue |
无界 |
二叉堆(数组) |
元素按优先级排序 |
DelayQueue |
无界 |
基于优先级队列 |
元素需延迟到期才能被取出 |
下面我们基于 JDK 8 源码,深入剖析这几个核心实现类的设计。
ArrayBlockingQueue:基于数组的单锁实现
核心数据结构与成员变量
ArrayBlockingQueue 使用一个定长的循环数组作为底层存储,并通过一个可重入锁(ReentrantLock)及两个条件变量(Condition)来实现线程安全。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 数据结构:定长循环数组
final Object[] items;
// 下一个要取的索引(队头)
int takeIndex;
// 下一个要放的索引(队尾)
int putIndex;
// 当前队列中的元素个数
int count;
/******* 并发控制:单锁 + 双条件 *******/
// 使用单个 ReentrantLock 保证所有操作的线程安全
final ReentrantLock lock;
// notEmpty 条件:当队列空时,take 操作在此等待
private final Condition notEmpty;
// notFull 条件:当队列满时,put 操作在此等待
private final Condition notFull;
}
put 操作流程
public void put(E e) throws InterruptedException {
checkNotNull(e); // 检查非null
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 可中断地获取锁
try {
while (count == items.length) // 队列已满
notFull.await(); // 在 notFull 条件上等待,释放锁
enqueue(e); // 执行入队逻辑
notEmpty.signal(); // 入队成功,唤醒一个在 notEmpty 上等待的消费者
} finally {
lock.unlock(); // 最终释放锁
}
}
take 操作流程
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 队列为空
notEmpty.await(); // 在 notEmpty 条件上等待
return dequeue(); // 执行出队逻辑,并会唤醒等待的生产者
} finally {
lock.unlock();
}
}
// 出队辅助方法
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null; // 帮助GC
if (++takeIndex == items.length) // 循环数组处理
takeIndex = 0;
count--;
// ... 省略迭代器维护代码
notFull.signal(); // 出队成功,队列有空位,唤醒一个在 notFull 上等待的生产者
return x;
}
工作原理总结:
- 数据结构:定长循环数组。
- 并发控制:采用单个
ReentrantLock 结合两个 Condition(notEmpty 和 notFull)实现精准的等待/通知机制。这种经典的 [ReentrantLock](/f/34-1) 加 Condition 的使用模式,是理解 BlockingQueue 的基础。
LinkedBlockingQueue:基于链表的双锁实现
LinkedBlockingQueue 采用了“双锁队列”的优化设计,将入队锁(putLock)和出队锁(takeLock)分离,从而允许生产者和消费者在大多数情况下并发操作,极大提升了吞吐量。
核心数据结构与成员变量
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 链表节点定义
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity; // 队列容量,默认为 Integer.MAX_VALUE(逻辑无界)
private final AtomicInteger count = new AtomicInteger(); // 当前元素数量,原子类保证可见性
transient Node<E> head; // 链表的虚拟头节点
private transient Node<E> last; // 链表尾节点
/********* 并发控制:双锁分离 *******/
private final ReentrantLock takeLock = new ReentrantLock(); // 出队锁
private final Condition notEmpty = takeLock.newCondition(); // 与出队锁绑定的条件
private final ReentrantLock putLock = new ReentrantLock(); // 入队锁
private final Condition notFull = putLock.newCondition(); // 与入队锁绑定的条件
}
put 操作核心逻辑
- 获取
putLock。
- 检查队列是否已满(
count.get() == capacity),若满则在 notFull 条件上等待。
- 将新节点加入链表尾部(
enqueue)。
- 原子递增
count。如果递增后队列仍未满,则唤醒另一个可能正在 notFull 上等待的生产者(“级联通知”优化)。
- 释放
putLock。
- 如果插入前队列为空(
c == 0),则调用 signalNotEmpty() 去获取 takeLock 并唤醒一个在 notEmpty 上等待的消费者。
take 操作核心逻辑
- 获取
takeLock。
- 检查队列是否为空(
count.get() == 0),若空则在 notEmpty 条件上等待。
- 移除链表头节点并返回数据(
dequeue)。
- 原子递减
count。如果递减后队列仍不为空,则唤醒另一个可能正在 notEmpty 上等待的消费者(“级联通知”优化)。
- 释放
takeLock。
- 如果取出前队列是满的(
c == capacity),则调用 signalNotFull() 去获取 putLock 并唤醒一个在 notFull 上等待的生产者。
工作原理总结:
- 数据结构:单向链表。
- 并发控制:采用双锁分离设计(
putLock & takeLock),配合原子计数器 count。生产者之间、消费者之间会竞争各自的锁,但生产者和消费者可以并行,这是其高吞吐量的关键。需注意其默认无界,使用时要警惕内存溢出(OOM)风险。
PriorityBlockingQueue:无界优先级队列
这是一个支持优先级排序的无界阻塞队列。底层基于一个可自动扩容的数组实现的最小堆(或最大堆,取决于比较器)。
核心数据结构与成员变量
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private transient Object[] queue; // 存储堆的数组
private transient int size; // 元素数量
private transient Comparator<? super E> comparator; // 比较器
/********* 并发控制 *******/
private final ReentrantLock lock; // 单锁控制所有操作
private final Condition notEmpty; // 仅需一个条件,因为队列无界,put永不阻塞
}
put/offer 操作核心
由于是无界队列,put 直接调用永不阻塞的 offer。
- 获取锁。
- 检查数组容量,若满则扩容(
tryGrow)。
- 根据是否有自定义
comparator,执行 siftUpComparable 或 siftUpUsingComparator 方法,将新元素从堆底“上滤”到合适位置,以维持堆序性质。
- 递增
size。
- 唤醒一个在
notEmpty 上等待的消费者。
- 释放锁。
take 操作核心
- 获取锁。
- 循环尝试调用
dequeue() 方法(该方法会执行堆顶的“下滤”操作以维持堆序)。
- 若
dequeue() 返回 null(即队列空),则在 notEmpty 条件上等待。
- 获取到元素后释放锁并返回。
工作原理总结:
- 数据结构:基于数组的平衡二叉堆。
- 排序:依赖元素的自然顺序(实现
Comparable)或构造时传入的 Comparator。
- 并发控制:单锁+单条件(
notEmpty)。put 操作永不阻塞,这是其“无界”特性的体现。
DelayQueue:延迟队列
DelayQueue 是一个无界阻塞队列,用于存放实现了 Delayed 接口的元素。元素只有在指定的延迟时间到期后才能被取出。它是实现定时任务调度、缓存过期等功能的理想选择。
核心思想:内部封装了一个 PriorityBlockingQueue<Delayed>,队列根据元素的 getDelay() 返回值进行排序,延迟最短(最先到期)的元素排在队首。
核心数据结构与成员变量
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>(); // 内部的优先级队列
// Leader-Follower 模式优化:减少不必要的定时等待
private Thread leader = null;
private final Condition available = lock.newCondition();
}
// 元素必须实现的接口
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit); // 返回剩余的延迟时间
}
put/offer 操作核心
- 获取锁。
- 调用
q.offer(e) 将元素插入内部优先级队列。
- 关键优化:检查新插入的元素是否成为了新的队首(
q.peek() == e)。如果是,意味着新元素的到期时间比之前的队首更早,之前设置的等待时间已失效。此时,将 leader 线程设为 null,并唤醒一个等待线程(available.signal()),让它重新检查最新的延迟时间。
- 释放锁。
take 操作核心
这是 DelayQueue 最精妙的部分,采用了 Leader-Follower 模式 来优化大量线程同时等待时的性能。
- 获取锁。
- 进入无限循环:
- 查看队首元素
first。
- 如果队首为空,线程在
available 上无限等待。
- 如果队首非空,计算其剩余延迟时间
delay。
- 若
delay <= 0,说明已到期,直接调用 q.poll() 返回。
- 若
delay > 0,说明需要等待。此时,如果已有其他线程是 leader,则当前线程作为 follower 无限等待 (available.await())。如果没有 leader,则当前线程成为 leader,并执行精确时间的等待 (available.awaitNanos(delay))。
- 在
finally 块中释放锁之前,如果当前没有 leader 且队列非空,则唤醒一个等待线程来接任新的 leader。
工作原理总结:
- 排序依据:元素的
getDelay() 方法返回值。
- 阻塞逻辑:
take() 会阻塞直到队首元素延迟到期。
- 并发优化:引入 Leader-Follower 模式,确保在多个消费者等待时,只有一个线程(
leader)进行精确的定时等待,其余线程(follower)进行无期限等待。这避免了大量线程同时进行 Thread.sleep 或定时 Condition.await 带来的不必要性能开销,是高效处理定时任务队列的关键设计。理解这种模式对于设计高性能的 [Java](/f/28-1) 服务至关重要。