0. 引言
在 云栈社区 的日常技术交流中,我们经常讨论智能指针的线程安全问题。虽然我们知道 shared_ptr 的引用计数更新操作是原子的(Atomic),能够保证引用计数的增减是线程安全的,但其对“指针本身”的修改却并非原子操作。
这意味着,在 多线程 环境下,如果多个线程同时读写同一个 shared_ptr 对象,极易引发竞态条件(Race Condition),导致未定义的行为或程序崩溃。
请看以下典型错误示例:
// 看似安全,实则存在竞态条件
std::shared_ptr<Config> global_config = std::make_shared<Config>();
void reader_thread(){
// 问题1:读取过程中的竞态条件
if (!global_config->settings.empty()) { // 读取指针本身
// 在这一刻,另一个线程可能已经修改了global_config
global_config->process(); // 可能使用悬空指针!
}
}
void writer_thread(){
// 问题2:修改操作的非原子性
auto new_config = std::make_shared<Config>();
global_config = new_config; // 这个赋值不是原子的!
}
为了实现安全的更新,传统做法是使用互斥锁(Mutex),但在高并发场景下,锁竞争往往会成为性能瓶颈。为了解决这一痛点,C++ 20 标准库正式引入了 std::atomic<std::shared_ptr<T>>。
本文将深入分析其设计思路、底层实现原理、源码细节以及实际应用场景,帮助大家理解其背后的设计哲学,并掌握在实际开发中的正确使用方法。
1. 核心目标与设计哲学
std::atomic<std::shared_ptr<T>> 的设计初衷并非是要“重新发明 shared_ptr”,而是在现有 shared_ptr 的基础上,为其提供原子化的读写访问接口。其核心设计哲学可概括为:“兼容原有语义、最小化性能开销、保证原子性与内存序”。
1.1 核心目标
atomic_shared_ptr 致力于让“对同一共享指针对象的跨线程读写”成为原子操作,具体包含以下两点保证:
- 消除竞态条件:当多个线程同时对
atomic_shared_ptr 执行拷贝、赋值、重置等操作时,确保引用计数的更新与指针的切换是同步的,不会出现数据竞争。
- 原子语义:操作结果具有可预测性,符合原子操作的“不可中断性”语义。即操作要么完全执行成功,要么完全不执行,绝不会出现中间状态。
1.2 设计思路
在 C++ 标准库的演进过程中,我们可以看到两种不同的实现策略:
- C++20 之前:主要依赖全局哈希锁表和重载特定函数来实现(下文源码解读部分会详细展开)。
- C++20 之后:特化了原子变量对于
shared_ptr 的实现。
C++20 后的核心设计思路是:通过一个高效的、定制化的内部锁机制,将非原子的 shared_ptr 双指针结构(对象指针 + 控制块指针),在逻辑上封装成原子的、线程安全的操作。
值得注意的是,其设计哲学明确优先考虑兼容性和实现的稳健性,而非盲目追求硬件层面的无锁(Lock-Free)特性,因为在大多数硬件架构上,原子地操作两个指针(128位或更多)是非常昂贵甚至不支持的。
2. 实现原理剖析
我们可以将实现原理分为两个阶段来进行解读:
- 阶段一(C++20 前):提供了一组独立的原子操作函数(如
std::atomic_load),通过外部的锁定策略来保证安全。
- 阶段二(C++20 后):通过嵌入式的自旋锁优化性能。利用内存对齐特性——
shared_ptr 的控制块指针(_Sp_counted_base*)内存对齐至少大于 1 字节,因此其指针值的最低有效位(LSB)恒为 0。标准库巧妙地复用这一位作为锁标记(Lock Bit),从而实现对数据的原子操作。
3. 源码实现深度解读
3.1 C++20 前源码分析
在 C++20 之前,标准库通常使用一个全局的锁池来管理所有 shared_ptr 的原子操作。
1)核心结构:_Sp_locker
该结构使用指针地址计算哈希值作为锁的键值。_M_key1 和 _M_key2 存储哈希值,对应全局锁数组中的索引。这种设计允许不同对象映射到不同的锁,从而减少锁竞争。
struct _Sp_locker
{
#ifdef __GTHREADS // 有线程支持
explicit _Sp_locker(const void*) noexcept;
_Sp_locker(const void*, const void*) noexcept;
~_Sp_locker();
private:
unsigned char _M_key1;
unsigned char _M_key2;
#else // 无线程支持
explicit _Sp_locker(const void*, const void* = nullptr) { }
#endif
};
2)原子操作实现
以 atomic_store_explicit 为例,其内存序参数实际上被忽略了,因为互斥锁本身就提供了完整的内存屏障。
template<typename _Tp>
inline void
atomic_store_explicit(shared_ptr<_Tp>* __p, shared_ptr<_Tp> __r, memory_order)
{
_Sp_locker __lock{__p}; // 1. 对__p加锁
__p->swap(__r); // 2. 交换内容
// 3. __lock析构,自动解锁
// 4. __r(现在持有旧值)离开作用域,可能触发析构
}
3.2 C++20 后源码分析
C++20 引入了更为精细的实现。从架构上来看,它自下而上分为三层:
1)最底层:_Atomic_count (嵌入指针的自旋锁)
这一层实际存储一个 uintptr_t 类型的原子变量,利用最低有效位(LSB)作为锁标志位。代码中通过静态断言(Static Assert)来确保指针的对齐方式满足要求,即最低位必须为 0。
struct _Atomic_count {
mutable __atomic_base<uintptr_t> _M_val{0};
static constexpr uintptr_t _S_lock_bit{1}; // 锁标志位
};
// 确保控制块指针至少有2字节对齐,LSB总是0(因为地址是2的倍数)
static_assert(alignof(remove_pointer_t<pointer>) > 1);
加锁操作实现:
pointer lock(memory_order __o) const noexcept
{
// 步骤1:等待锁释放(自旋)
auto __current = _M_val.load(memory_order_relaxed);
while (__current & _S_lock_bit) // 锁被占用
{
#if __cpp_lib_atomic_wait
__detail::__thread_relax();
#endif
__current = _M_val.load(memory_order_relaxed);
}
// 步骤2:尝试获取锁(CAS操作)
_GLIBCXX_TSAN_MUTEX_TRY_LOCK(&_M_val);
while (!_M_val.compare_exchange_strong(__current,
__current | _S_lock_bit,
__o,
memory_order_relaxed))
{
_GLIBCXX_TSAN_MUTEX_TRY_LOCK_FAILED(&_M_val);
__current = __current & ~_S_lock_bit; // 清除可能的锁位
// 继续尝试...
}
_GLIBCXX_TSAN_MUTEX_LOCKED(&_M_val);
// 步骤3:返回去锁后的指针
return reinterpret_cast<pointer>(__current);
}
解锁操作实现:
解锁操作非常高效,通过直接减一的方式清除锁位(因为持有锁的线程已知锁位为 1)。
void unlock(memory_order __o) const noexcept
{
_GLIBCXX_TSAN_MUTEX_PRE_UNLOCK(&_M_val);
_M_val.fetch_sub(1, __o); // 减1清除锁位
_GLIBCXX_TSAN_MUTEX_POST_UNLOCK(&_M_val);
}
2)中间层:_Sp_atomic (原子化包装器)
这一层利用 _Atomic_count 提供的细粒度锁,将非原子的“数据指针 + 控制块指针”这对成员,包装成具有原子语义的整体。
template<typename _Tp>
class _Sp_atomic {
typename _Tp::element_type* _M_ptr = nullptr; // 1. 数据指针
_Atomic_count _M_refcount; // 2. 原子化引用计数(含锁)
// ... 方法
};
_M_ptr:即 shared_ptr.get() 返回的原始指针,指向用户数据。它本身是非原子的。
_M_refcount:核心组件,原子地管理着控制块指针和锁。
构造函数:
constexpr _Sp_atomic() noexcept = default;
explicit _Sp_atomic(value_type __r) noexcept
: _M_ptr(__r._M_ptr), _M_refcount(std::move(__r._M_refcount))
{ }
- 默认构造:创建一个“空”的原子包装器。
- 移动构造:进行成员级的移动。注意,此过程不涉及原子操作或锁,仅在单线程初始化时使用。
关键接口分析:load 操作
value_type load(memory_order __o) const noexcept
{
// 1. 输入验证:load不能是释放语义
__glibcxx_assert(__o != memory_order_release && __o != memory_order_acq_rel);
// 2. 内存序提升:确保能读到最新的数据
if (__o != memory_order_seq_cst) __o = memory_order_acquire;
value_type __ret;
// 3. 【关键加锁】获取当前控制块指针并锁定
auto __pi = _M_refcount.lock(__o);
// 4. 在锁的保护下,安全地拷贝数据指针
__ret._M_ptr = _M_ptr;
// 5. 【关键操作】增加引用计数,并将控制块指针赋给返回值
__ret._M_refcount._M_pi = _S_add_ref(__pi);
// 6. 解锁
_M_refcount.unlock(memory_order_relaxed);
// 7. 构造并返回一个完整的 shared_ptr
return __ret;
}
关键接口分析:compare_exchange_strong 操作
bool compare_exchange_strong(value_type& __expected, value_type __desired,
memory_order __o, memory_order __o2) noexcept
{
bool __result = true;
// 1. 【加锁】获取当前状态
auto __pi = _M_refcount.lock(memory_order_acquire);
// 2. 【比较】在锁的保护下,比较当前状态与期望值__expected
if (_M_ptr == __expected._M_ptr && __pi == __expected._M_refcount._M_pi)
{
// 3a. 【相等,执行交换】
_M_ptr = __desired._M_ptr; // 设置新数据指针
_M_refcount._M_swap_unlock(__desired._M_refcount, __o); // 交换控制块并解锁
}
else
{
// 3b. 【不相等,更新期望值】
// 先移动__expected到临时变量,防止其资源在后续操作中被意外释放
_Tp __sink = std::move(__expected);
// 将当前实际值写入__expected,供调用者下次重试使用
__expected._M_ptr = _M_ptr;
__expected._M_refcount._M_pi = _S_add_ref(__pi); // 增加引用计数!
// 解锁(使用失败内存序__o2)
_M_refcount.unlock(__o2);
__result = false;
}
return __result;
}
3)最上层:std::atomic<shared_ptr<T>> (用户接口)
这是面向用户的最终接口类,它持有一个 _Sp_atomic 成员,并将所有操作转发给它。
template<typename _Tp>
struct atomic<shared_ptr<_Tp>>
{
public:
using value_type = shared_ptr<_Tp>;
static constexpr bool is_always_lock_free = false; // 关键声明
private:
_Sp_atomic<shared_ptr<_Tp>> _M_impl; // 唯一的核心数据成员
};
一次 store 调用的完整工作流程:
假设调用 atomic<shared_ptr<int>>::store(new_value):
- 调用转发:
store 调用内部成员 _M_impl.swap(new_value, memory_order)。
- 包装器加锁:
_Sp_atomic::swap 调用 _M_refcount.lock()。假设当前控制块指针是 0x1000 (LSB为0),lock 操作通过 CAS 将其原子地改为 0x1001,成功获得锁。
- 交换数据:在锁的保护下,交换
_M_ptr 和 new_value 中的数据指针。
- 交换计数与解锁:调用
_M_refcount._M_swap_unlock,将 _M_val 从 0x1001 原子地交换为 new_value 的控制块指针(假设为 0x2000)。操作完成后,锁位(LSB)自动归零。
- 旧值析构:形参
new_value 现在持有旧的指针和旧的控制块(0x1000)。随着函数返回,new_value 析构,安全地减少旧控制块的引用计数。
3.3 新旧实现对比
| 特性维度 |
C++20 之前(传统实现) |
C++20 及以后(特化实现) |
| 核心机制 |
全局哈希锁池 (_Sp_locker) |
嵌入指针的自旋锁 (_Atomic_count) |
| 数据结构 |
普通 std::shared_ptr<T>,由外部锁保护 |
专用 _Sp_atomic 结构,分离存储数据指针和原子引用计数 |
| 锁粒度 |
粗粒度。基于地址哈希,不同对象可能竞争同一把锁 |
细粒度。锁状态嵌入每个对象的控制块指针中,真正做到每对象一把锁 |
| 同步原语 |
依赖操作系统互斥锁 (std::mutex),可能导致挂起 |
用户态自旋锁,通过 CAS 在指针 LSB 位上忙等待,低竞争下性能极高 |
| 内存序 |
锁本身提供强内存屏障,用户指定的内存序基本被忽略 |
精细处理。将用户指定的内存序传递给底层原子操作,开销更可控 |
| API 形式 |
独立的自由函数,如 atomic_load |
模板特化类 std::atomic<std::shared_ptr<T>> |
| 无锁特性 |
否,且无法查询 |
否,但通过 is_always_lock_free 明确告知。这是一种“诚实的”有锁实现 |
| 设计哲学 |
兼容性优先。最稳健、易实现的方案 |
性能与兼容性平衡。利用对齐特性,追求极致的空间和时间效率 |
4. 实战应用:线程安全的全局配置管理
假设我们需要维护一个全局配置(GlobalConfig),它由后台线程周期性更新,同时有多个工作线程频繁读取。我们需要保证:工作线程在读取配置时,绝不会读到正在修改中、处于不一致状态的配置对象。
以下是使用 std::atomic<std::shared_ptr<T>> 的完整实战代码:
#include <iostream>
#include <memory>
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>
#include <mutex>
// 1. 模拟一个复杂的全局配置类
struct GlobalConfig {
int timeoutMs;
std::string serverUrl;
std::vector<int> featureFlags;
void print(const std::string& from) const {
static std::mutex io_mutex; // 保护控制台输出
std::lock_guard<std::mutex> lock(io_mutex);
std::cout << "[" << from << "] Config: timeout=" << timeoutMs
<< "ms, server=" << serverUrl
<< ", flags size=" << featureFlags.size() << std::endl;
}
};
class ConfigManager {
private:
// 2. 核心:使用 atomic<shared_ptr> 持有当前配置
std::atomic<std::shared_ptr<const GlobalConfig>> currentConfig_;
// 用于模拟配置更新的简单锁(实际可能从文件/网络读取)
std::mutex configUpdateMutex_;
public:
ConfigManager() {
// 3. 初始化一个默认配置
auto defaultConfig = std::make_shared<GlobalConfig>();
defaultConfig->timeoutMs = 1000;
defaultConfig->serverUrl = "https://default.example.com";
defaultConfig->featureFlags = {1, 2, 3};
currentConfig_.store(defaultConfig);
}
// 4. 工作线程安全获取当前配置(无锁读)
std::shared_ptr<const GlobalConfig> getCurrentConfig() const {
// 原子加载,返回 shared_ptr 的副本,线程安全且高效
return currentConfig_.load(std::memory_order_acquire);
}
// 5. 后台更新线程:原子地更新整个配置
void updateConfig(int newTimeout, const std::string& newUrl) {
// 在本地构建全新的配置对象,不影响正在使用的旧配置
auto newConfig = std::make_shared<GlobalConfig>();
newConfig->timeoutMs = newTimeout;
newConfig->serverUrl = newUrl;
newConfig->featureFlags = {4, 5, 6, 7}; // 模拟配置变更
// 【关键操作】原子地替换全局配置指针
// store() 内部会确保新旧配置的切换对所有线程立即可见
currentConfig_.store(newConfig, std::memory_order_release);
// 旧配置(如果无其他持有者)将在此函数结束后被自动释放
std::cout << "Config updated by updater thread." << std::endl;
}
// 6. 高级用法:使用 compare_exchange 实现“仅当配置未被更改过才更新”的逻辑
bool updateConfigIfUnchanged(std::shared_ptr<const GlobalConfig>& expected,
const GlobalConfig& newValue) {
std::shared_ptr<const GlobalConfig> newConfig = std::make_shared<GlobalConfig>(newValue);
// 尝试原子比较并交换
// 如果 currentConfig_ 仍然等于 expected,则将其替换为 newConfig
return currentConfig_.compare_exchange_strong(expected, newConfig);
}
};
// 模拟工作线程
void workerThread(int id, const ConfigManager& manager) {
for(int i = 0; i < 3; ++i) {
// 安全地获取当前配置的快照
auto config = manager.getCurrentConfig();
config->print("Worker " + std::to_string(id));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
int main() {
ConfigManager manager;
// 启动3个工作线程
std::vector<std::thread> workers;
for(int i = 0; i < 3; ++i) {
workers.emplace_back(workerThread, i, std::ref(manager));
}
// 主线程模拟后台配置更新
std::this_thread::sleep_for(std::chrono::seconds(1));
manager.updateConfig(2000, "https://updated.example.com");
std::this_thread::sleep_for(std::chrono::seconds(1));
manager.updateConfig(3000, "https://final.example.com");
// 等待所有工作线程结束
for(auto& t : workers) {
t.join();
}
// 7. 演示高级的 compare_exchange 用法
std::cout << "\n--- Testing compare_exchange ---" << std::endl;
auto oldConfig = manager.getCurrentConfig();
GlobalConfig newConfigData = {500, "https://cas.example.com", {8, 9}};
// 第一次尝试:应该成功(因为我们持有最新的配置)
bool success1 = manager.updateConfigIfUnchanged(oldConfig, newConfigData);
std::cout << "First CAS attempt: " << (success1 ? "成功" : "失败") << std::endl;
// 第二次尝试:应该失败(因为配置已经被我们自己更新了)
bool success2 = manager.updateConfigIfUnchanged(oldConfig, newConfigData);
std::cout << "Second CAS attempt: " << (success2 ? "成功" : "失败") << std::endl;
return 0;
}
通过上述代码,我们不仅实现了配置的热更新,还保证了所有读取操作的线程安全性。atomic<shared_ptr> 在这里充当了关键的同步桥梁,使得我们无需显式管理复杂的互斥锁,即可享受到标准库带来的安全与高效。