找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2345

积分

0

好友

311

主题
发表于 1 小时前 | 查看: 2| 回复: 0

C++编程与多线程概念示意图

生产者-消费者模型是并发编程里最经典的模型之一,一个线程往队列里塞数据,另一个线程从队列里取数据,听起来简单得不能再简单了。

但你真写起来就会发现,问题全在“等”这个字上——队列空的时候消费者怎么等?队列满的时候生产者怎么等?程序要退出的时候怎么让正在等的线程醒过来?

这篇文章就围绕“怎么等”这一个问题,从最朴素的轮询开始,一步步推导到 condition_variable 的正确用法,把每个关键细节都拆给你看。

一、轮询:最直觉的方案,也是最烂的方案

假设我们有一个共享队列,生产者往里 push,消费者往外 pop,两边用一把 mutex 保护。最直觉的写法大概长这样:

std::queue<int> q;
std::mutex mtx;

// 生产者
void producer() {
    for (int i = 0; i < 100; ++i) {
        std::lock_guard<std::mutex> lock(mtx);
        q.push(i);
    }
}

// 消费者
void consumer() {
    while (true) {
        std::lock_guard<std::mutex> lock(mtx);
        if (!q.empty()) {
            int val = q.front();
            q.pop();
            // 处理 val ...
        }
    }
}

看起来挺对的?逻辑上没毛病——加锁、检查队列、有数据就取、没数据就继续循环。

但你跑起来就知道问题有多大了。打开任务管理器你会发现,消费者线程的 CPU 占用率直接拉满。它在干嘛?什么有用的事都没干,就是在不停地加锁、检查、发现队列是空的、解锁、再加锁、再检查……每秒执行几百万次无意义的循环,这就是忙等(busy waiting)

更要命的是,消费者不停抢锁还会影响生产者——生产者想 push 数据的时候发现锁总是被消费者占着,push 的效率也跟着下降。一个线程空转把两个线程都拖慢了。

加个 sleep 行不行?

很多人的第一反应是加个 sleep

void consumer() {
    while (true) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            if (!q.empty()) {
                int val = q.front();
                q.pop();
                // 处理 val ...
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

CPU 占用是降下来了,但引入了新问题:延迟

生产者刚 push 进去一个数据,消费者可能还在睡觉,最多要等 10ms 才能醒过来。你把 sleep 时间调短,延迟是低了但 CPU 又上去了;调长,CPU 是省了但延迟又不可接受。这是一个无解的跷跷板——你没办法同时做到低延迟和低 CPU 占用

除非,有一种机制能让消费者在队列为空时真正睡过去,等生产者 push 数据之后再精准地把它叫醒。

这就是条件变量干的事。

二、条件变量:让线程睡着等,醒了就有活干

std::condition_variable 提供了两个核心操作:

  • wait():让当前线程释放锁并挂起,进入内核等待队列,不消耗 CPU
  • notify_one()/ notify_all():唤醒一个或全部等待中的线程

用条件变量改写消费者:

std::queue<int> q;
std::mutex mtx;
std::condition_variable cv;

// 生产者
void producer() {
    for (int i = 0; i < 100; ++i) {
        {
            std::lock_guard<std::mutex> lock(mtx);
            q.push(i);
        }
        cv.notify_one();  // 通知消费者:有数据了
    }
}

// 消费者
void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [&]{ return !q.empty(); });
        int val = q.front();
        q.pop();
        lock.unlock();
        // 处理 val ...
    }
}

对比一下轮询版本,消费者的核心逻辑从“死循环检查”变成了一行 cv.wait(lock, predicate)——队列为空时线程直接挂起,零 CPU 消耗;生产者 push 数据后调用 notify_one(),消费者几乎立刻被唤醒,延迟在微秒级别。

低延迟和低 CPU 占用,条件变量同时做到了。

底层发生了什么?

在 Linux 上,condition_variable 底层用的是 futex(Fast Userspace muTEX)系统调用。当线程调用 wait() 的时候,它会通过 futex(FUTEX_WAIT) 陷入内核,内核把这个线程从运行队列摘掉,挂到一个等待队列上,然后调度器去执行别的线程。这个等待中的线程不占用任何 CPU 时间片。

当另一个线程调用 notify_one() 的时候,底层调用 futex(FUTEX_WAKE),内核从等待队列上摘下一个线程放回运行队列,调度器会在合适的时机恢复它的执行。

整个过程是操作系统级别的精准调度,跟用户态的 sleep 轮询完全不是一个量级。其他平台的底层实现不同(macOS 基于 psynch,Windows 基于 SRWLock+ CONDITION_VARIABLE),但 C++11 标准库屏蔽了这些差异,上层行为完全一致。

三、虚假唤醒:为什么 wait 必须带谓词

你可能注意到了,上面的代码里 wait 带了一个 lambda:

cv.wait(lock, [&]{ return !q.empty(); });

而不是简单的:

cv.wait(lock);

这不是可选的优化,这是必须的。原因有两个。

原因一:虚假唤醒(spurious wakeup)

C++ 标准明确允许 condition_variable::wait() 在没有收到 notify 的情况下自行返回。这不是 bug,是标准允许的行为——因为某些平台的底层实现(比如 POSIX pthread)就是这样设计的,强制消除虚假唤醒的成本太高,不如让调用方自己检查条件。

如果你不带谓词直接 wait(lock),虚假唤醒后线程醒了但队列还是空的,你直接 q.front() 就是未定义行为。

原因二:丢失唤醒(lost wakeup)

condition_variable 的通知不会排队——notify 时如果没有线程正在 wait,这个通知就直接丢了,不会留给后来者。

假设消费者不检查条件就直接 wait

std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock);  // 不检查队列,直接等通知
int val = q.front();
q.pop();

如果生产者在消费者进入 wait() 之前就已经 push 了数据并 notify_one(),通知打了个空——没人在等。消费者随后才进入 wait(),傻等一个已经来过的通知,永远醒不过来。这就是丢失唤醒。

你可能会想,那我在 wait 前加个 if 检查不就行了?

std::unique_lock<std::mutex> lock(mtx);
if (q.empty()) {
    cv.wait(lock);  // 只检查一次
}
int val = q.front();  // 危险!
q.pop();

单消费者场景下 if 确实能防住丢失唤醒(因为检查和 wait 全程持锁,生产者插不进来),但在多消费者场景下还是不够:Consumer A 和 B 都在 wait(),生产者 push 一条数据后 notify_one(),A 先醒来取走了数据;如果 B 因虚假唤醒也醒了过来,队列已空,直接 q.front() 就是未定义行为。用 if 只检查一次不够,必须用循环反复确认条件。

带谓词的 wait(lock, pred) 本质上等价于:

while (!pred()) {
    cv.wait(lock);
}

每次被唤醒(不管是真唤醒还是虚假唤醒)都会重新检查条件,而且进入 wait 之前也会先检查——如果条件已经满足就根本不会睡下去,完美解决丢失唤醒的问题。

一句话:永远用带谓词的 wait,没有例外。

四、unique_lock 和 lock_guard:为什么条件变量只认 unique_lock

你可能已经注意到,消费者用的是 unique_lock 而不是 lock_guard,这不是随便选的。

condition_variable::wait() 内部需要干两件事:

  1. 先释放锁——不释放的话生产者永远拿不到锁,数据也 push 不进来
  2. 被唤醒后重新加锁——保证醒来时依然持有锁,可以安全地操作共享数据

lock_guard 的设计哲学是“构造时加锁,析构时解锁,中间不能动”,它压根没有 unlock()lock() 方法,编译器都不让你调,wait() 自然没法用它来释放和重新获取锁。

unique_lock 就灵活多了,它支持中途 unlock()lock()wait() 正好需要这个能力——先 unlock 让生产者能拿到锁去 push 数据,被唤醒后再 lock 回来保证醒来时依然持有锁。这也是为什么 condition_variable::wait() 的参数类型写死了 std::unique_lock<std::mutex>&,编译器层面就强制你用对。

简单记: 凡是涉及 condition_variable 的地方,一律用 unique_lock。不涉及的地方用 lock_guard 就够了,它更轻量(虽然实际性能差异可以忽略不计)。

五、优雅停止:怎么让 wait 中的线程退出

到目前为止,消费者线程有个致命问题——它停不下来。

void consumer() {
    while (true) {  // 永远循环
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [&]{ return !q.empty(); });  // 队列空就一直等
        // ...
    }
}

如果生产者已经不会再 push 数据了,消费者会永远阻塞在 wait() 上。你调用 thread::join() 等它结束?等到天荒地老也等不来。

解法是加一个 stop 标志,把它塞进 wait 的谓词里:

bool stop_flag = false;  // 普通 bool,靠 mutex 保护

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, [&]{ return !q.empty() || stop_flag; });

        if (stop_flag && q.empty()) {
            break;  // 收到停止信号且队列已排空,退出
        }

        int val = q.front();
        q.pop();
        lock.unlock();
        // 处理 val ...
    }
}

停止的时候:

void request_stop() {
    {
        std::lock_guard<std::mutex> lock(mtx);
        stop_flag = true;  // 必须在锁内设置
    }
    cv.notify_all();  // 必须用 notify_all,唤醒所有等待中的消费者
}

注意 stop_flag 用的是普通 bool 而不是 atomic<bool>——因为它的读写全都在 mtx 的保护下,原子性由锁保证。反过来说,如果你用 atomic<bool> 并且 request_stop() 里不加锁就直接 store(true)notify_all(),是有 bug 的:consumer 可能刚检查完谓词(返回 false)还没来得及进入 wait(),这时你的 notify_all() 就打了个空——没有线程在等,通知直接丢掉,consumer 随后进入 wait() 就再也醒不过来了。加锁能堵住这个窗口:consumer 持锁检查谓词期间,你的 request_stop() 会卡在锁上等着;等 consumer 进了 wait() 释放锁之后你才能设标志并 notify,这样通知一定能送达。

除了上面说的“必须加锁”之外,还有几个容易踩的坑:

1. 谓词里必须包含 stop 条件。 如果谓词还是 !q.empty(),设了 stop_flag 消费者也醒不过来——谓词不满足,wait 检查一遍发现条件还是 false,又接着睡。

2. 停止时必须用 notify_all() 而不是 notify_one() 如果有多个消费者线程都在 wait()notify_one() 只唤醒一个,其他的永远收不到停止信号,你的 join() 会死等。

3. 先设标志再 notify,顺序不能反。 反了的话,消费者先被唤醒、检查 stop_flag 发现是 false、又睡回去,然后你才设 true 但已经没有后续的 notify 了——两边都卡死。

4. 退出前是否排空队列取决于业务。 上面的代码在 stop_flag 为 true 时还会继续处理队列里剩余的数据,直到队列为空才真正退出,这在大多数场景下是你想要的行为。如果不需要排空,把条件改成 if (stop_flag) break; 就行。

六、完整实现:一个能停的生产者-消费者

把前面的所有细节合到一起,这是一个完整的、可编译的实现:

#include<iostream>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<queue>
#include<vector>

class ProducerConsumer {
public:
    void produce(int value) {
        {
            std::lock_guard<std::mutex> lock(mtx_);
            queue_.push(value);
        }
        cv_.notify_one();
    }

    // 返回 false 表示已停止且队列为空
    bool consume(int& value) {
        std::unique_lock<std::mutex> lock(mtx_);
        cv_.wait(lock, [this]{ return !queue_.empty() || stop_; });

        if (stop_ && queue_.empty()) {
            return false;
        }

        value = queue_.front();
        queue_.pop();
        return true;
    }

    void stop() {
        {
            std::lock_guard<std::mutex> lock(mtx_);
            stop_ = true;
        }
        cv_.notify_all();
    }

private:
    std::queue<int> queue_;
    std::mutex mtx_;
    std::condition_variable cv_;
    bool stop_ = false;
};

int main() {
    ProducerConsumer pc;

    // 启动 2 个生产者
    std::vector<std::thread> producers;
    for (int i = 0; i < 2; ++i) {
        producers.emplace_back([&pc, i]{
            for (int j = 0; j < 5; ++j) {
                pc.produce(i * 100 + j);
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
            }
        });
    }

    // 启动 2 个消费者
    std::vector<std::thread> consumers;
    for (int i = 0; i < 2; ++i) {
        consumers.emplace_back([&pc, i]{
            int val;
            while (pc.consume(val)) {
                std::cout << "Consumer " << i << " got: " << val << "\n";
            }
        });
    }

    // 等生产者结束
    for (auto& t : producers) t.join();

    // 停止消费者
    pc.stop();
    for (auto& t : consumers) t.join();

    std::cout << "All done.\n";
    return 0;
}

这个实现里有几个设计上的讲究,拆开说说:

stop_ 用普通 bool 而不是 atomic。 因为对 stop_ 的所有读写都在 mtx_ 的保护下,不需要额外的原子性保证。反而如果用了 atomic<bool> 却不加锁,可能会跟 wait 的谓词检查产生竞态。stop() 方法里先加锁设 stop_ = truenotify_all(),保证了所有消费者都能看到最新的 stop_ 值。

produce 用 lock_guard,consume 用 unique_lock。 生产者不需要调用 wait()lock_guard 就够用了,简单直接。消费者需要 wait(),必须用 unique_lock

notify_one 在 lock 作用域之外调用。 produce 里先释放锁再 notify_one()。在锁内 notify 也是完全正确的,锁外 notify 可以避免被唤醒的线程立刻去抢一把还没释放的锁。不过现代 glibc(≥ 2.25)已支持 wait morphing 优化——在锁内 notify 时,内核会把线程直接从条件变量的等待队列转移到 mutex 的等待队列,避免无谓的上下文切换,所以两种写法在实践中性能差异已经很小。

七、对比总结

维度 轮询 sleep轮询 condition_variable
CPU 占用 极高(100%) 可控 几乎为零
响应延迟 极低 取决于 sleep 时间 极低(微秒级)
能否优雅停止 可以(检查标志) 可以(有延迟) 可以(notify唤醒)
实现复杂度 中等
生产环境推荐 绝对不要 勉强凑合 标准做法

条件变量凭什么比轮询强?说到底就一句话:它把“等待”从用户态的忙循环下沉到了内核态的线程调度——让操作系统来决定什么时候该醒,而不是线程自己不停地问“到了吗?到了吗?到了吗?”。

这就像餐厅叫号,轮询是你每隔 10 秒跑去前台问“到我了吗”,条件变量是你坐在位子上等广播叫你的号——结果一样,但你和前台都轻松多了。在高并发场景下,正确使用 C++11 提供的 std::condition_variable 是保障系统性能和响应性的基石。

声明:本文是经过严格查阅相关权威文献和资料,形成的专业的可靠的内容。全文数据都有据可依,可回溯。特别申明:数据和资料已获得授权。本文内容,不涉及任何偏颇观点,用中立态度客观事实描述事情本身。




上一篇:从关键词到自然语言:AI搜索如何革新传统搜索体验与面试要点解析
下一篇:从BAT到TMD:为什么做软件比搞硬件更容易“爆富”?
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-3-25 03:50 , Processed in 0.530841 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表