在编写采用多线程的 C++ 程序时,如果每个线程都独立运行,这类程序通常易于实现,代码也通俗易懂。
然而,在实际开发中,不同线程之间的任务存在相互依赖,是一种十分常见的情况。这意味着某些线程必须等待其他线程对共享变量进行修改并发出通知后才能继续执行。此时,我们就需要借助 std::condition_variable 来进行线程间的协调与调度。
本文将通过实例,探讨 std::condition_variable 的基本用法及其需要注意的几个关键问题。
先来看一个基础的使用示例:
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
class ThreadDataProcessor {
private:
std::mutex mtx_;
std::condition_variable cv_;
std::string processed_data_;
bool is_data_ready_ = false;
bool is_processing_done_ = false;
public:
void WorkerProcess(){
std::cout << "[Worker Thread] Start" << std::endl;
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]() { return is_data_ready_; });
std::cout << "[Worker Thread] Processing data..." << std::endl;
processed_data_ += " after processing";
is_processing_done_ = true;
std::cout << "[Worker Thread] Data processing completed, notify master thread" << std::endl;
lock.unlock();
cv_.notify_one();
}
void MasterControl(){
std::cout << "[Master Thread] Start" << std::endl;
processed_data_ = "Example data";
{
std::lock_guard<std::mutex> lock(mtx_);
is_data_ready_ = true;
std::cout << "[Master Thread] Data is ready, notify worker thread" << std::endl;
}
cv_.notify_one();
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]() { return is_processing_done_; });
}
std::cout << "[Master Thread] Final processed data: " << processed_data_ << std::endl;
}
};
int main(){
ThreadDataProcessor processor;
std::thread worker_thread(&ThreadDataProcessor::WorkerProcess, &processor);
std::thread master_thread(&ThreadDataProcessor::MasterControl, &processor);
worker_thread.join();
master_thread.join();
return 0;
}
在这个示例中,主线程(master thread)和工作线程(worker thread)并发运行,两个线程的任务存在明确的依赖关系:主线程完成准备工作后通知工作线程;工作线程处理完数据后,再通知主线程进行后续操作。
程序输出结果如下:
[Worker Thread] Start
[Master Thread] Start
[Master Thread] Data is ready, notify worker thread
[Worker Thread] Processing data...
[Worker Thread] Data processing completed, notify master thread
[Master Thread] Final processed data: Example data after processing
这个例子运行正常,但它掩盖了一个潜在的风险:如果条件变量在等待之前就已经被通知了,会发生什么?
查阅 cppreference 对 notify_one() 的说明,其中提到:
This makes it impossible for notify_one() to, for example, be delayed and unblock a thread that started waiting just after the call to notify_one() was made.
这意味着,要想唤醒一个正在等待的线程,通知操作必须发生在线程开始等待之后。如果顺序颠倒,通知信号可能会被“错过”,导致线程永久阻塞(除非发生虚假唤醒)。
下面这个程序就演示了这种因错过通知而永久挂起的情况:
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
class CondVarEarlyNotifyDemo {
private:
std::mutex mtx_;
std::condition_variable cv_;
std::string data_;
bool is_data_ready_ = false;
bool is_processed_ = false;
public:
void WorkerThread(){
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟延迟,让主线程先运行并发出通知
std::cout << "[Worker Thread] Start" << std::endl;
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock); // 使用不带谓词的wait
std::cout << "[Worker Thread] Processing data..." << std::endl;
data_ += " after processing";
is_processed_ = true;
std::cout << "[Worker Thread] Data processing completed, notify master thread" << std::endl;
lock.unlock();
cv_.notify_one();
}
void MasterThread(){
std::cout << "[Master Thread] Start" << std::endl;
data_ = "Example data";
{
std::lock_guard<std::mutex> lock(mtx_);
is_data_ready_ = true;
std::cout << "[Master Thread] Data is ready, notify worker thread" << std::endl;
}
cv_.notify_one(); // 此通知发生在工作线程开始wait之前,信号被错过
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock); // 主线程也会永久等待
}
std::cout << "[Master Thread] Final processed data: " << data_ << std::endl;
}
};
int main(){
CondVarEarlyNotifyDemo demo;
std::thread worker(&CondVarEarlyNotifyDemo::WorkerThread, &demo);
std::thread master(&CondVarEarlyNotifyDemo::MasterThread, &demo);
worker.join();
master.join();
return 0;
}
在这个有缺陷的例子中,主线程很快发出通知,而工作线程因sleep延迟了1秒才开始等待。结果就是通知信号被发出时,没有线程在等待,信号无效,导致双方都陷入永久等待。
有一种标准的技巧可以完美规避“错过通知”的问题:使用带谓词(predicate)的 wait 函数,即 wait(lock, stop_waiting),而不是简单的 wait(lock)。只有当谓词 stop_waiting 返回 false 时,线程才会真正尝试进入等待状态。这样,即使通知提前发出,线程检查谓词发现条件已满足,也会直接继续执行,不会被阻塞。
修复后的代码如下:
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
class CondVarPredicateFixDemo {
private:
std::mutex mtx_;
std::condition_variable cv_;
std::string data_;
bool is_data_ready_ = false;
bool is_processed_ = false;
public:
void WorkerThread(){
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "[Worker Thread] Start" << std::endl;
std::unique_lock<std::mutex> lock(mtx_);
// 使用带谓词的wait,检查 is_data_ready_
cv_.wait(lock, [this]() { return is_data_ready_; });
std::cout << "[Worker Thread] Processing data..." << std::endl;
data_ += " after processing";
is_processed_ = true;
std::cout << "[Worker Thread] Data processing completed, notify master thread" << std::endl;
lock.unlock();
cv_.notify_one();
}
void MasterThread(){
std::cout << "[Master Thread] Start" << std::endl;
data_ = "Example data";
{
std::lock_guard<std::mutex> lock(mtx_);
is_data_ready_ = true;
std::cout << "[Master Thread] Data is ready, notify worker thread" << std::endl;
}
cv_.notify_one();
{
std::unique_lock<std::mutex> lock(mtx_);
// 主线程的wait也使用谓词,检查 is_processed_
cv_.wait(lock, [this]() { return is_processed_; });
}
std::cout << "[Master Thread] Final processed data: " << data_ << std::endl;
}
};
int main(){
CondVarPredicateFixDemo demo;
std::thread worker(&CondVarPredicateFixDemo::WorkerThread, &demo);
std::thread master(&CondVarPredicateFixDemo::MasterThread, &demo);
worker.join();
master.join();
return 0;
}
输出如下,程序正确运行:
[Master Thread] Start
[Master Thread] Data is ready, notify worker thread
[Worker Thread] Start
[Worker Thread] Processing data...
[Worker Thread] Data processing completed, notify master thread
[Master Thread] Final processed data: Example data after processing
使用带谓词的 wait 还有另一个至关重要的作用:防御虚假唤醒(Spurious wakeup)。
所谓虚假唤醒,是指在某些系统实现下,即使没有线程调用 notify_one() 或 notify_all(),等待在条件变量上的线程也可能被无缘无故地唤醒。这看似不合理,却是标准所允许的,可能导致程序逻辑错误。
带谓词的 wait(lock, stop_waiting) 在内部等价于以下循环:
while (!stop_waiting()) {
wait(lock);
}
正是这个循环结构,使得即使发生了虚假唤醒,线程在检查谓词后发现条件仍未满足,便会再次进入等待状态,从而保证了程序的正确性。因此,在涉及高并发的多线程编程中,始终使用带谓词的 wait 是一种强健且推荐的做法。
通过分析条件变量的“错过通知”和“虚假唤醒”这两个经典陷阱,我们可以更深刻地理解线程同步机制的细节。掌握这些知识,有助于我们编写出更稳定、可靠的并发程序。如果你对 C++ 并发编程有更多兴趣,欢迎在 云栈社区 与其他开发者交流探讨。