找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

1538

积分

0

好友

193

主题
发表于 3 天前 | 查看: 14| 回复: 0
本帖最后由 云栈大前端 于 2026-1-7 17:10 编辑

0. 引言

云栈社区 的日常技术交流中,我们经常讨论智能指针的线程安全问题。虽然我们知道 shared_ptr 的引用计数更新操作是原子的(Atomic),能够保证引用计数的增减是线程安全的,但其对“指针本身”的修改却并非原子操作。

这意味着,在 多线程 环境下,如果多个线程同时读写同一个 shared_ptr 对象,极易引发竞态条件(Race Condition),导致未定义的行为或程序崩溃。

请看以下典型错误示例:

// 看似安全,实则存在竞态条件
std::shared_ptr<Config> global_config = std::make_shared<Config>();

void reader_thread(){
    // 问题1:读取过程中的竞态条件
    if (!global_config->settings.empty()) {  // 读取指针本身
        // 在这一刻,另一个线程可能已经修改了global_config
        global_config->process();  // 可能使用悬空指针!
    }
}

void writer_thread(){
    // 问题2:修改操作的非原子性
    auto new_config = std::make_shared<Config>();
    global_config = new_config;  // 这个赋值不是原子的!
}

为了实现安全的更新,传统做法是使用互斥锁(Mutex),但在高并发场景下,锁竞争往往会成为性能瓶颈。为了解决这一痛点,C++ 20 标准库正式引入了 std::atomic<std::shared_ptr<T>>

本文将深入分析其设计思路、底层实现原理、源码细节以及实际应用场景,帮助大家理解其背后的设计哲学,并掌握在实际开发中的正确使用方法。

1. 核心目标与设计哲学

std::atomic<std::shared_ptr<T>> 的设计初衷并非是要“重新发明 shared_ptr”,而是在现有 shared_ptr 的基础上,为其提供原子化的读写访问接口。其核心设计哲学可概括为:“兼容原有语义、最小化性能开销、保证原子性与内存序”。

1.1 核心目标

atomic_shared_ptr 致力于让“对同一共享指针对象的跨线程读写”成为原子操作,具体包含以下两点保证:

  1. 消除竞态条件:当多个线程同时对 atomic_shared_ptr 执行拷贝、赋值、重置等操作时,确保引用计数的更新与指针的切换是同步的,不会出现数据竞争。
  2. 原子语义:操作结果具有可预测性,符合原子操作的“不可中断性”语义。即操作要么完全执行成功,要么完全不执行,绝不会出现中间状态。

1.2 设计思路

在 C++ 标准库的演进过程中,我们可以看到两种不同的实现策略:

  • C++20 之前:主要依赖全局哈希锁表和重载特定函数来实现(下文源码解读部分会详细展开)。
  • C++20 之后:特化了原子变量对于 shared_ptr 的实现。

C++20 后的核心设计思路是:通过一个高效的、定制化的内部锁机制,将非原子的 shared_ptr 双指针结构(对象指针 + 控制块指针),在逻辑上封装成原子的、线程安全的操作

值得注意的是,其设计哲学明确优先考虑兼容性和实现的稳健性,而非盲目追求硬件层面的无锁(Lock-Free)特性,因为在大多数硬件架构上,原子地操作两个指针(128位或更多)是非常昂贵甚至不支持的。

2. 实现原理剖析

我们可以将实现原理分为两个阶段来进行解读:

  1. 阶段一(C++20 前):提供了一组独立的原子操作函数(如 std::atomic_load),通过外部的锁定策略来保证安全。
  2. 阶段二(C++20 后):通过嵌入式的自旋锁优化性能。利用内存对齐特性——shared_ptr 的控制块指针(_Sp_counted_base*)内存对齐至少大于 1 字节,因此其指针值的最低有效位(LSB)恒为 0。标准库巧妙地复用这一位作为锁标记(Lock Bit),从而实现对数据的原子操作。

3. 源码实现深度解读

3.1 C++20 前源码分析

在 C++20 之前,标准库通常使用一个全局的锁池来管理所有 shared_ptr 的原子操作。

1)核心结构:_Sp_locker

该结构使用指针地址计算哈希值作为锁的键值。_M_key1_M_key2 存储哈希值,对应全局锁数组中的索引。这种设计允许不同对象映射到不同的锁,从而减少锁竞争。

struct _Sp_locker
{
#ifdef __GTHREADS  // 有线程支持
    explicit _Sp_locker(const void*) noexcept;
    _Sp_locker(const void*, const void*) noexcept;
    ~_Sp_locker();
private:
    unsigned char _M_key1;
    unsigned char _M_key2;
#else // 无线程支持
    explicit _Sp_locker(const void*, const void* = nullptr) { }
#endif
};

2)原子操作实现

atomic_store_explicit 为例,其内存序参数实际上被忽略了,因为互斥锁本身就提供了完整的内存屏障。

template<typename _Tp>
inline void
atomic_store_explicit(shared_ptr<_Tp>* __p, shared_ptr<_Tp> __r, memory_order)
{
    _Sp_locker __lock{__p};           // 1. 对__p加锁
    __p->swap(__r);                   // 2. 交换内容
    // 3. __lock析构,自动解锁
    // 4. __r(现在持有旧值)离开作用域,可能触发析构
}

3.2 C++20 后源码分析

C++20 引入了更为精细的实现。从架构上来看,它自下而上分为三层:

64033.webp

1)最底层:_Atomic_count (嵌入指针的自旋锁)

这一层实际存储一个 uintptr_t 类型的原子变量,利用最低有效位(LSB)作为锁标志位。代码中通过静态断言(Static Assert)来确保指针的对齐方式满足要求,即最低位必须为 0。

struct _Atomic_count {
    mutable __atomic_base<uintptr_t> _M_val{0};
    static constexpr uintptr_t _S_lock_bit{1};  // 锁标志位
};

// 确保控制块指针至少有2字节对齐,LSB总是0(因为地址是2的倍数)
static_assert(alignof(remove_pointer_t<pointer>) > 1);

加锁操作实现:

pointer lock(memory_order __o) const noexcept
{
    // 步骤1:等待锁释放(自旋)
    auto __current = _M_val.load(memory_order_relaxed);
    while (__current & _S_lock_bit)  // 锁被占用
    {
#if __cpp_lib_atomic_wait
        __detail::__thread_relax();  
#endif
        __current = _M_val.load(memory_order_relaxed);
    }
    // 步骤2:尝试获取锁(CAS操作)
    _GLIBCXX_TSAN_MUTEX_TRY_LOCK(&_M_val);
    while (!_M_val.compare_exchange_strong(__current,
                                           __current | _S_lock_bit,
                                           __o,
                                           memory_order_relaxed))
    {
        _GLIBCXX_TSAN_MUTEX_TRY_LOCK_FAILED(&_M_val);
        __current = __current & ~_S_lock_bit;  // 清除可能的锁位
        // 继续尝试...
    }
    _GLIBCXX_TSAN_MUTEX_LOCKED(&_M_val);
    // 步骤3:返回去锁后的指针
    return reinterpret_cast<pointer>(__current);
}

解锁操作实现:

解锁操作非常高效,通过直接减一的方式清除锁位(因为持有锁的线程已知锁位为 1)。

void unlock(memory_order __o) const noexcept
{
    _GLIBCXX_TSAN_MUTEX_PRE_UNLOCK(&_M_val);
    _M_val.fetch_sub(1, __o);  // 减1清除锁位
    _GLIBCXX_TSAN_MUTEX_POST_UNLOCK(&_M_val);
}

2)中间层:_Sp_atomic (原子化包装器)

这一层利用 _Atomic_count 提供的细粒度锁,将非原子的“数据指针 + 控制块指针”这对成员,包装成具有原子语义的整体。

template<typename _Tp>
class _Sp_atomic {
    typename _Tp::element_type* _M_ptr = nullptr; // 1. 数据指针
    _Atomic_count _M_refcount;                    // 2. 原子化引用计数(含锁)
    // ... 方法
};
  • _M_ptr:即 shared_ptr.get() 返回的原始指针,指向用户数据。它本身是非原子的。
  • _M_refcount:核心组件,原子地管理着控制块指针和锁。

构造函数:

    constexpr _Sp_atomic() noexcept = default;
    explicit _Sp_atomic(value_type __r) noexcept
    : _M_ptr(__r._M_ptr), _M_refcount(std::move(__r._M_refcount))
    { }
  • 默认构造:创建一个“空”的原子包装器。
  • 移动构造:进行成员级的移动。注意,此过程不涉及原子操作或锁,仅在单线程初始化时使用。

关键接口分析:load 操作

value_type load(memory_order __o) const noexcept
{
    // 1. 输入验证:load不能是释放语义
    __glibcxx_assert(__o != memory_order_release && __o != memory_order_acq_rel);
    // 2. 内存序提升:确保能读到最新的数据
    if (__o != memory_order_seq_cst) __o = memory_order_acquire;

    value_type __ret;
    // 3. 【关键加锁】获取当前控制块指针并锁定
    auto __pi = _M_refcount.lock(__o);
    // 4. 在锁的保护下,安全地拷贝数据指针
    __ret._M_ptr = _M_ptr;
    // 5. 【关键操作】增加引用计数,并将控制块指针赋给返回值
    __ret._M_refcount._M_pi = _S_add_ref(__pi);
    // 6. 解锁
    _M_refcount.unlock(memory_order_relaxed);
    // 7. 构造并返回一个完整的 shared_ptr
    return __ret;
}

关键接口分析:compare_exchange_strong 操作

bool compare_exchange_strong(value_type& __expected, value_type __desired,
                            memory_order __o, memory_order __o2) noexcept
{
    bool __result = true;
    // 1. 【加锁】获取当前状态
    auto __pi = _M_refcount.lock(memory_order_acquire);
    // 2. 【比较】在锁的保护下,比较当前状态与期望值__expected
    if (_M_ptr == __expected._M_ptr && __pi == __expected._M_refcount._M_pi)
    {
        // 3a. 【相等,执行交换】
        _M_ptr = __desired._M_ptr; // 设置新数据指针
        _M_refcount._M_swap_unlock(__desired._M_refcount, __o); // 交换控制块并解锁
    }
    else
    {
        // 3b. 【不相等,更新期望值】
        // 先移动__expected到临时变量,防止其资源在后续操作中被意外释放
        _Tp __sink = std::move(__expected);
        // 将当前实际值写入__expected,供调用者下次重试使用
        __expected._M_ptr = _M_ptr;
        __expected._M_refcount._M_pi = _S_add_ref(__pi); // 增加引用计数!
        // 解锁(使用失败内存序__o2)
        _M_refcount.unlock(__o2);
        __result = false;
    }
    return __result;
}

3)最上层:std::atomic<shared_ptr<T>> (用户接口)

这是面向用户的最终接口类,它持有一个 _Sp_atomic 成员,并将所有操作转发给它。

template<typename _Tp>
struct atomic<shared_ptr<_Tp>>
{
public:
    using value_type = shared_ptr<_Tp>;
    static constexpr bool is_always_lock_free = false; // 关键声明
private:
    _Sp_atomic<shared_ptr<_Tp>> _M_impl; // 唯一的核心数据成员
};

一次 store 调用的完整工作流程:

假设调用 atomic<shared_ptr<int>>::store(new_value)

  1. 调用转发store 调用内部成员 _M_impl.swap(new_value, memory_order)
  2. 包装器加锁_Sp_atomic::swap 调用 _M_refcount.lock()。假设当前控制块指针是 0x1000 (LSB为0),lock 操作通过 CAS 将其原子地改为 0x1001,成功获得锁。
  3. 交换数据:在锁的保护下,交换 _M_ptrnew_value 中的数据指针。
  4. 交换计数与解锁:调用 _M_refcount._M_swap_unlock,将 _M_val0x1001 原子地交换为 new_value 的控制块指针(假设为 0x2000)。操作完成后,锁位(LSB)自动归零。
  5. 旧值析构:形参 new_value 现在持有旧的指针和旧的控制块(0x1000)。随着函数返回,new_value 析构,安全地减少旧控制块的引用计数。

3.3 新旧实现对比

特性维度 C++20 之前(传统实现) C++20 及以后(特化实现)
核心机制 全局哈希锁池 (_Sp_locker) 嵌入指针的自旋锁 (_Atomic_count)
数据结构 普通 std::shared_ptr<T>,由外部锁保护 专用 _Sp_atomic 结构,分离存储数据指针和原子引用计数
锁粒度 粗粒度。基于地址哈希,不同对象可能竞争同一把锁 细粒度。锁状态嵌入每个对象的控制块指针中,真正做到每对象一把锁
同步原语 依赖操作系统互斥锁 (std::mutex),可能导致挂起 用户态自旋锁,通过 CAS 在指针 LSB 位上忙等待,低竞争下性能极高
内存序 锁本身提供强内存屏障,用户指定的内存序基本被忽略 精细处理。将用户指定的内存序传递给底层原子操作,开销更可控
API 形式 独立的自由函数,如 atomic_load 模板特化类 std::atomic<std::shared_ptr<T>>
无锁特性 否,且无法查询 否,但通过 is_always_lock_free 明确告知。这是一种“诚实的”有锁实现
设计哲学 兼容性优先。最稳健、易实现的方案 性能与兼容性平衡。利用对齐特性,追求极致的空间和时间效率

4. 实战应用:线程安全的全局配置管理

假设我们需要维护一个全局配置(GlobalConfig),它由后台线程周期性更新,同时有多个工作线程频繁读取。我们需要保证:工作线程在读取配置时,绝不会读到正在修改中、处于不一致状态的配置对象。

以下是使用 std::atomic<std::shared_ptr<T>> 的完整实战代码:

#include <iostream>
#include <memory>
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>
#include <mutex>

// 1. 模拟一个复杂的全局配置类
struct GlobalConfig {
    int timeoutMs;
    std::string serverUrl;
    std::vector<int> featureFlags;

    void print(const std::string& from) const {
        static std::mutex io_mutex; // 保护控制台输出
        std::lock_guard<std::mutex> lock(io_mutex);
        std::cout << "[" << from << "] Config: timeout=" << timeoutMs 
                  << "ms, server=" << serverUrl 
                  << ", flags size=" << featureFlags.size() << std::endl;
    }
};

class ConfigManager {
private:
    // 2. 核心:使用 atomic<shared_ptr> 持有当前配置
    std::atomic<std::shared_ptr<const GlobalConfig>> currentConfig_;

    // 用于模拟配置更新的简单锁(实际可能从文件/网络读取)
    std::mutex configUpdateMutex_;

public:
    ConfigManager() {
        // 3. 初始化一个默认配置
        auto defaultConfig = std::make_shared<GlobalConfig>();
        defaultConfig->timeoutMs = 1000;
        defaultConfig->serverUrl = "https://default.example.com";
        defaultConfig->featureFlags = {1, 2, 3};
        currentConfig_.store(defaultConfig);
    }

    // 4. 工作线程安全获取当前配置(无锁读)
    std::shared_ptr<const GlobalConfig> getCurrentConfig() const {
        // 原子加载,返回 shared_ptr 的副本,线程安全且高效
        return currentConfig_.load(std::memory_order_acquire);
    }

    // 5. 后台更新线程:原子地更新整个配置
    void updateConfig(int newTimeout, const std::string& newUrl) {
        // 在本地构建全新的配置对象,不影响正在使用的旧配置
        auto newConfig = std::make_shared<GlobalConfig>();
        newConfig->timeoutMs = newTimeout;
        newConfig->serverUrl = newUrl;
        newConfig->featureFlags = {4, 5, 6, 7}; // 模拟配置变更

        // 【关键操作】原子地替换全局配置指针
        // store() 内部会确保新旧配置的切换对所有线程立即可见
        currentConfig_.store(newConfig, std::memory_order_release);

        // 旧配置(如果无其他持有者)将在此函数结束后被自动释放
        std::cout << "Config updated by updater thread." << std::endl;
    }

    // 6. 高级用法:使用 compare_exchange 实现“仅当配置未被更改过才更新”的逻辑
    bool updateConfigIfUnchanged(std::shared_ptr<const GlobalConfig>& expected, 
                                 const GlobalConfig& newValue) {
        std::shared_ptr<const GlobalConfig> newConfig = std::make_shared<GlobalConfig>(newValue);

        // 尝试原子比较并交换
        // 如果 currentConfig_ 仍然等于 expected,则将其替换为 newConfig
        return currentConfig_.compare_exchange_strong(expected, newConfig);
    }
};

// 模拟工作线程
void workerThread(int id, const ConfigManager& manager) {
    for(int i = 0; i < 3; ++i) {
        // 安全地获取当前配置的快照
        auto config = manager.getCurrentConfig();
        config->print("Worker " + std::to_string(id));

        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
}

int main() {
    ConfigManager manager;

    // 启动3个工作线程
    std::vector<std::thread> workers;
    for(int i = 0; i < 3; ++i) {
        workers.emplace_back(workerThread, i, std::ref(manager));
    }

    // 主线程模拟后台配置更新
    std::this_thread::sleep_for(std::chrono::seconds(1));
    manager.updateConfig(2000, "https://updated.example.com");

    std::this_thread::sleep_for(std::chrono::seconds(1));
    manager.updateConfig(3000, "https://final.example.com");

    // 等待所有工作线程结束
    for(auto& t : workers) {
        t.join();
    }

    // 7. 演示高级的 compare_exchange 用法
    std::cout << "\n--- Testing compare_exchange ---" << std::endl;
    auto oldConfig = manager.getCurrentConfig();
    GlobalConfig newConfigData = {500, "https://cas.example.com", {8, 9}};

    // 第一次尝试:应该成功(因为我们持有最新的配置)
    bool success1 = manager.updateConfigIfUnchanged(oldConfig, newConfigData);
    std::cout << "First CAS attempt: " << (success1 ? "成功" : "失败") << std::endl;

    // 第二次尝试:应该失败(因为配置已经被我们自己更新了)
    bool success2 = manager.updateConfigIfUnchanged(oldConfig, newConfigData);
    std::cout << "Second CAS attempt: " << (success2 ? "成功" : "失败") << std::endl;

    return 0;
}

通过上述代码,我们不仅实现了配置的热更新,还保证了所有读取操作的线程安全性。atomic<shared_ptr> 在这里充当了关键的同步桥梁,使得我们无需显式管理复杂的互斥锁,即可享受到标准库带来的安全与高效。




上一篇:MT6735双安卓系统平板评测:物理隔离双路CPU,8英寸摸鱼神器
下一篇:树莓派搭建VPN网关:NordVPN安装配置与性能优化指南
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-1-10 08:51 , Processed in 0.196576 second(s), 42 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2025 云栈社区.

快速回复 返回顶部 返回列表