
生产者-消费者模型是并发编程里最经典的模型之一,一个线程往队列里塞数据,另一个线程从队列里取数据,听起来简单得不能再简单了。
但你真写起来就会发现,问题全在“等”这个字上——队列空的时候消费者怎么等?队列满的时候生产者怎么等?程序要退出的时候怎么让正在等的线程醒过来?
这篇文章就围绕“怎么等”这一个问题,从最朴素的轮询开始,一步步推导到 condition_variable 的正确用法,把每个关键细节都拆给你看。
一、轮询:最直觉的方案,也是最烂的方案
假设我们有一个共享队列,生产者往里 push,消费者往外 pop,两边用一把 mutex 保护。最直觉的写法大概长这样:
std::queue<int> q;
std::mutex mtx;
// 生产者
void producer() {
for (int i = 0; i < 100; ++i) {
std::lock_guard<std::mutex> lock(mtx);
q.push(i);
}
}
// 消费者
void consumer() {
while (true) {
std::lock_guard<std::mutex> lock(mtx);
if (!q.empty()) {
int val = q.front();
q.pop();
// 处理 val ...
}
}
}
看起来挺对的?逻辑上没毛病——加锁、检查队列、有数据就取、没数据就继续循环。
但你跑起来就知道问题有多大了。打开任务管理器你会发现,消费者线程的 CPU 占用率直接拉满。它在干嘛?什么有用的事都没干,就是在不停地加锁、检查、发现队列是空的、解锁、再加锁、再检查……每秒执行几百万次无意义的循环,这就是忙等(busy waiting)。
更要命的是,消费者不停抢锁还会影响生产者——生产者想 push 数据的时候发现锁总是被消费者占着,push 的效率也跟着下降。一个线程空转把两个线程都拖慢了。
加个 sleep 行不行?
很多人的第一反应是加个 sleep:
void consumer() {
while (true) {
{
std::lock_guard<std::mutex> lock(mtx);
if (!q.empty()) {
int val = q.front();
q.pop();
// 处理 val ...
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
CPU 占用是降下来了,但引入了新问题:延迟。
生产者刚 push 进去一个数据,消费者可能还在睡觉,最多要等 10ms 才能醒过来。你把 sleep 时间调短,延迟是低了但 CPU 又上去了;调长,CPU 是省了但延迟又不可接受。这是一个无解的跷跷板——你没办法同时做到低延迟和低 CPU 占用。
除非,有一种机制能让消费者在队列为空时真正睡过去,等生产者 push 数据之后再精准地把它叫醒。
这就是条件变量干的事。
二、条件变量:让线程睡着等,醒了就有活干
std::condition_variable 提供了两个核心操作:
wait():让当前线程释放锁并挂起,进入内核等待队列,不消耗 CPU
notify_one()/ notify_all():唤醒一个或全部等待中的线程
用条件变量改写消费者:
std::queue<int> q;
std::mutex mtx;
std::condition_variable cv;
// 生产者
void producer() {
for (int i = 0; i < 100; ++i) {
{
std::lock_guard<std::mutex> lock(mtx);
q.push(i);
}
cv.notify_one(); // 通知消费者:有数据了
}
}
// 消费者
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&]{ return !q.empty(); });
int val = q.front();
q.pop();
lock.unlock();
// 处理 val ...
}
}
对比一下轮询版本,消费者的核心逻辑从“死循环检查”变成了一行 cv.wait(lock, predicate)——队列为空时线程直接挂起,零 CPU 消耗;生产者 push 数据后调用 notify_one(),消费者几乎立刻被唤醒,延迟在微秒级别。
低延迟和低 CPU 占用,条件变量同时做到了。
底层发生了什么?
在 Linux 上,condition_variable 底层用的是 futex(Fast Userspace muTEX)系统调用。当线程调用 wait() 的时候,它会通过 futex(FUTEX_WAIT) 陷入内核,内核把这个线程从运行队列摘掉,挂到一个等待队列上,然后调度器去执行别的线程。这个等待中的线程不占用任何 CPU 时间片。
当另一个线程调用 notify_one() 的时候,底层调用 futex(FUTEX_WAKE),内核从等待队列上摘下一个线程放回运行队列,调度器会在合适的时机恢复它的执行。
整个过程是操作系统级别的精准调度,跟用户态的 sleep 轮询完全不是一个量级。其他平台的底层实现不同(macOS 基于 psynch,Windows 基于 SRWLock+ CONDITION_VARIABLE),但 C++11 标准库屏蔽了这些差异,上层行为完全一致。
三、虚假唤醒:为什么 wait 必须带谓词
你可能注意到了,上面的代码里 wait 带了一个 lambda:
cv.wait(lock, [&]{ return !q.empty(); });
而不是简单的:
cv.wait(lock);
这不是可选的优化,这是必须的。原因有两个。
原因一:虚假唤醒(spurious wakeup)
C++ 标准明确允许 condition_variable::wait() 在没有收到 notify 的情况下自行返回。这不是 bug,是标准允许的行为——因为某些平台的底层实现(比如 POSIX pthread)就是这样设计的,强制消除虚假唤醒的成本太高,不如让调用方自己检查条件。
如果你不带谓词直接 wait(lock),虚假唤醒后线程醒了但队列还是空的,你直接 q.front() 就是未定义行为。
原因二:丢失唤醒(lost wakeup)
condition_variable 的通知不会排队——notify 时如果没有线程正在 wait,这个通知就直接丢了,不会留给后来者。
假设消费者不检查条件就直接 wait:
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock); // 不检查队列,直接等通知
int val = q.front();
q.pop();
如果生产者在消费者进入 wait() 之前就已经 push 了数据并 notify_one(),通知打了个空——没人在等。消费者随后才进入 wait(),傻等一个已经来过的通知,永远醒不过来。这就是丢失唤醒。
你可能会想,那我在 wait 前加个 if 检查不就行了?
std::unique_lock<std::mutex> lock(mtx);
if (q.empty()) {
cv.wait(lock); // 只检查一次
}
int val = q.front(); // 危险!
q.pop();
单消费者场景下 if 确实能防住丢失唤醒(因为检查和 wait 全程持锁,生产者插不进来),但在多消费者场景下还是不够:Consumer A 和 B 都在 wait(),生产者 push 一条数据后 notify_one(),A 先醒来取走了数据;如果 B 因虚假唤醒也醒了过来,队列已空,直接 q.front() 就是未定义行为。用 if 只检查一次不够,必须用循环反复确认条件。
带谓词的 wait(lock, pred) 本质上等价于:
while (!pred()) {
cv.wait(lock);
}
每次被唤醒(不管是真唤醒还是虚假唤醒)都会重新检查条件,而且进入 wait 之前也会先检查——如果条件已经满足就根本不会睡下去,完美解决丢失唤醒的问题。
一句话:永远用带谓词的 wait,没有例外。
四、unique_lock 和 lock_guard:为什么条件变量只认 unique_lock
你可能已经注意到,消费者用的是 unique_lock 而不是 lock_guard,这不是随便选的。
condition_variable::wait() 内部需要干两件事:
- 先释放锁——不释放的话生产者永远拿不到锁,数据也 push 不进来
- 被唤醒后重新加锁——保证醒来时依然持有锁,可以安全地操作共享数据
lock_guard 的设计哲学是“构造时加锁,析构时解锁,中间不能动”,它压根没有 unlock() 和 lock() 方法,编译器都不让你调,wait() 自然没法用它来释放和重新获取锁。
unique_lock 就灵活多了,它支持中途 unlock() 和 lock(),wait() 正好需要这个能力——先 unlock 让生产者能拿到锁去 push 数据,被唤醒后再 lock 回来保证醒来时依然持有锁。这也是为什么 condition_variable::wait() 的参数类型写死了 std::unique_lock<std::mutex>&,编译器层面就强制你用对。
简单记: 凡是涉及 condition_variable 的地方,一律用 unique_lock。不涉及的地方用 lock_guard 就够了,它更轻量(虽然实际性能差异可以忽略不计)。
五、优雅停止:怎么让 wait 中的线程退出
到目前为止,消费者线程有个致命问题——它停不下来。
void consumer() {
while (true) { // 永远循环
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&]{ return !q.empty(); }); // 队列空就一直等
// ...
}
}
如果生产者已经不会再 push 数据了,消费者会永远阻塞在 wait() 上。你调用 thread::join() 等它结束?等到天荒地老也等不来。
解法是加一个 stop 标志,把它塞进 wait 的谓词里:
bool stop_flag = false; // 普通 bool,靠 mutex 保护
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&]{ return !q.empty() || stop_flag; });
if (stop_flag && q.empty()) {
break; // 收到停止信号且队列已排空,退出
}
int val = q.front();
q.pop();
lock.unlock();
// 处理 val ...
}
}
停止的时候:
void request_stop() {
{
std::lock_guard<std::mutex> lock(mtx);
stop_flag = true; // 必须在锁内设置
}
cv.notify_all(); // 必须用 notify_all,唤醒所有等待中的消费者
}
注意 stop_flag 用的是普通 bool 而不是 atomic<bool>——因为它的读写全都在 mtx 的保护下,原子性由锁保证。反过来说,如果你用 atomic<bool> 并且 request_stop() 里不加锁就直接 store(true) 再 notify_all(),是有 bug 的:consumer 可能刚检查完谓词(返回 false)还没来得及进入 wait(),这时你的 notify_all() 就打了个空——没有线程在等,通知直接丢掉,consumer 随后进入 wait() 就再也醒不过来了。加锁能堵住这个窗口:consumer 持锁检查谓词期间,你的 request_stop() 会卡在锁上等着;等 consumer 进了 wait() 释放锁之后你才能设标志并 notify,这样通知一定能送达。
除了上面说的“必须加锁”之外,还有几个容易踩的坑:
1. 谓词里必须包含 stop 条件。 如果谓词还是 !q.empty(),设了 stop_flag 消费者也醒不过来——谓词不满足,wait 检查一遍发现条件还是 false,又接着睡。
2. 停止时必须用 notify_all() 而不是 notify_one()。 如果有多个消费者线程都在 wait(),notify_one() 只唤醒一个,其他的永远收不到停止信号,你的 join() 会死等。
3. 先设标志再 notify,顺序不能反。 反了的话,消费者先被唤醒、检查 stop_flag 发现是 false、又睡回去,然后你才设 true 但已经没有后续的 notify 了——两边都卡死。
4. 退出前是否排空队列取决于业务。 上面的代码在 stop_flag 为 true 时还会继续处理队列里剩余的数据,直到队列为空才真正退出,这在大多数场景下是你想要的行为。如果不需要排空,把条件改成 if (stop_flag) break; 就行。
六、完整实现:一个能停的生产者-消费者
把前面的所有细节合到一起,这是一个完整的、可编译的实现:
#include<iostream>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<queue>
#include<vector>
class ProducerConsumer {
public:
void produce(int value) {
{
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
}
cv_.notify_one();
}
// 返回 false 表示已停止且队列为空
bool consume(int& value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]{ return !queue_.empty() || stop_; });
if (stop_ && queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void stop() {
{
std::lock_guard<std::mutex> lock(mtx_);
stop_ = true;
}
cv_.notify_all();
}
private:
std::queue<int> queue_;
std::mutex mtx_;
std::condition_variable cv_;
bool stop_ = false;
};
int main() {
ProducerConsumer pc;
// 启动 2 个生产者
std::vector<std::thread> producers;
for (int i = 0; i < 2; ++i) {
producers.emplace_back([&pc, i]{
for (int j = 0; j < 5; ++j) {
pc.produce(i * 100 + j);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
}
// 启动 2 个消费者
std::vector<std::thread> consumers;
for (int i = 0; i < 2; ++i) {
consumers.emplace_back([&pc, i]{
int val;
while (pc.consume(val)) {
std::cout << "Consumer " << i << " got: " << val << "\n";
}
});
}
// 等生产者结束
for (auto& t : producers) t.join();
// 停止消费者
pc.stop();
for (auto& t : consumers) t.join();
std::cout << "All done.\n";
return 0;
}
这个实现里有几个设计上的讲究,拆开说说:
stop_ 用普通 bool 而不是 atomic。 因为对 stop_ 的所有读写都在 mtx_ 的保护下,不需要额外的原子性保证。反而如果用了 atomic<bool> 却不加锁,可能会跟 wait 的谓词检查产生竞态。stop() 方法里先加锁设 stop_ = true 再 notify_all(),保证了所有消费者都能看到最新的 stop_ 值。
produce 用 lock_guard,consume 用 unique_lock。 生产者不需要调用 wait(),lock_guard 就够用了,简单直接。消费者需要 wait(),必须用 unique_lock。
notify_one 在 lock 作用域之外调用。 produce 里先释放锁再 notify_one()。在锁内 notify 也是完全正确的,锁外 notify 可以避免被唤醒的线程立刻去抢一把还没释放的锁。不过现代 glibc(≥ 2.25)已支持 wait morphing 优化——在锁内 notify 时,内核会把线程直接从条件变量的等待队列转移到 mutex 的等待队列,避免无谓的上下文切换,所以两种写法在实践中性能差异已经很小。
七、对比总结
| 维度 |
轮询 |
sleep轮询 |
condition_variable |
| CPU 占用 |
极高(100%) |
可控 |
几乎为零 |
| 响应延迟 |
极低 |
取决于 sleep 时间 |
极低(微秒级) |
| 能否优雅停止 |
可以(检查标志) |
可以(有延迟) |
可以(notify唤醒) |
| 实现复杂度 |
低 |
低 |
中等 |
| 生产环境推荐 |
绝对不要 |
勉强凑合 |
标准做法 |
条件变量凭什么比轮询强?说到底就一句话:它把“等待”从用户态的忙循环下沉到了内核态的线程调度——让操作系统来决定什么时候该醒,而不是线程自己不停地问“到了吗?到了吗?到了吗?”。
这就像餐厅叫号,轮询是你每隔 10 秒跑去前台问“到我了吗”,条件变量是你坐在位子上等广播叫你的号——结果一样,但你和前台都轻松多了。在高并发场景下,正确使用 C++11 提供的 std::condition_variable 是保障系统性能和响应性的基石。
声明:本文是经过严格查阅相关权威文献和资料,形成的专业的可靠的内容。全文数据都有据可依,可回溯。特别申明:数据和资料已获得授权。本文内容,不涉及任何偏颇观点,用中立态度客观事实描述事情本身。