首页 > 解决方案 > T、volatile T 和 std::atomic 有什么区别?

问题描述

鉴于以下示例打算等待另一个线程存储42在共享变量中shared而没有锁且不等待线程终止,为什么需要volatile Tstd::atomic<T>建议保证并发正确性?

#include <atomic>
#include <cassert>
#include <cstdint>
#include <thread>

int main()
{
  int64_t shared = 0;
  std::thread thread([&shared]() {
    shared = 42;
  });
  while (shared != 42) {
  }
  assert(shared == 42);
  thread.join();
  return 0;
}

使用 GCC 4.8.5 和默认选项,该示例按预期工作。

标签: c++multithreadingc++11concurrencystdatomic

解决方案


测试似乎表明样本是正确的,但事实并非如此。类似的代码很容易最终投入生产,甚至可以完美运行多年。

我们可以从编译示例开始-O3。现在,样本无限期挂起。(默认是-O0, no optimization / debug-consistency ,这有点类似于 make each variable volatile这就是测试没有将代码显示为 unsafe 的原因。)

要找到根本原因,我们必须检查生成的程序集。首先,基于 GCC 4.8.5-O0的 x86_64 程序集对应于未优化的工作二进制文件:

        // Thread B:
        // shared = 42;
        movq    -8(%rbp), %rax
        movq    (%rax), %rax
        movq    $42, (%rax)

        // Thread A:
        // while (shared != 42) {
        // }
.L11:
        movq    -32(%rbp), %rax     # Check shared every iteration
        cmpq    $42, %rax
        jne     .L11

线程 B 对 中的值执行简单的42存储shared。线程 A 读取shared每个循环迭代,直到比较表明相等。

现在,我们将其与-O3结果进行比较:

        // Thread B:
        // shared = 42;
        movq    8(%rdi), %rax
        movq    $42, (%rax)

        // Thread A:
        // while (shared != 42) {
        // }
        cmpq    $42, (%rsp)         # check shared once
        je      .L87                # and skip the infinite loop or not
.L88:
        jmp     .L88                # infinite loop
.L87:

与相关的优化-O3将循环替换为单个比较,如果不相等,则使用无限循环以匹配预期行为。在 GCC 10.2 中,循环被优化了。(与 C 不同,没有副作用或易失性访问的无限循环在 C++ 中是未定义的行为。)

问题是编译器及其优化器不知道实现的并发含义。因此,结论必须是shared在线程 A 中不能改变 - 循环相当于死代码。(或者换句话说,数据竞争是 UB,并且允许优化器假设程序没有遇到 UB。如果你正在读取一个非原子变量,那一定意味着没有其他人在编写它。这个是允许编译器将负载提升出循环的原因,以及类似的接收器存储,这对于非共享变量的正常情况是非常有价值的优化。)

该解决方案要求我们与shared参与线程间通信的编译器进行通信。实现这一目标的一种方法可能是volatile. 虽然不同编译器的实际含义volatile不同,并且保证(如果有的话)是特定于编译器的,但普遍的共识是,这会volatile阻止编译器在基于寄存器的缓存方面优化易失性访问。这对于与硬件交互并在并发编程中占有一席之地的低级代码至关重要,尽管由于std::atomic.

随着volatile int64_t shared,生成的指令变化如下:

        // Thread B:
        // shared = 42;
        movq    24(%rdi), %rax
        movq    $42, (%rax)

        // Thread A:
        // while (shared != 42) {
        // }
.L87:
        movq    8(%rsp), %rax
        cmpq    $42, %rax
        jne     .L87

循环不能再被消除,因为它必须假设已经shared改变,即使没有代码形式的证据。因此,该示例现在适用于-O3.

如果volatile解决了这个问题,你为什么需要std::atomic?与无锁代码相关的两个方面是std::atomic必不可少的:内存操作原子性和内存顺序。

为了构建加载/存储原子性的案例,我们查看了使用GCC4.8.5 -O3 -m32(32 位版本)编译的生成程序集volatile int64_t shared

        // Thread B:
        // shared = 42;
        movl    4(%esp), %eax
        movl    12(%eax), %eax
        movl    $42, (%eax)
        movl    $0, 4(%eax)

        // Thread A:
        // while (shared != 42) {
        // }
.L88:                               # do {
        movl    40(%esp), %eax
        movl    44(%esp), %edx
        xorl    $42, %eax
        movl    %eax, %ecx
        orl     %edx, %ecx
        jne     .L88                # } while(shared ^ 42 != 0);

对于 32 位 x86 代码生成,64 位加载和存储通常分为两条指令。对于单线程代码,这不是问题。对于多线程代码,这意味着另一个线程可以看到 64 位内存操作的部分结果,从而为可能不会 100% 导致问题的意外不一致留出空间,但可能会随机发生受周围代码和软件使用模式的影响很大。即使 GCC 选择生成默认保证原子性的指令,这仍然不会影响其他编译器,并且可能不适用于所有支持的平台。

为了防止在所有情况下以及跨所有编译器和支持的平台进行部分加载/存储,std::atomic可以使用。让我们回顾一下如何std::atomic影响生成的程序集。更新的样本:

#include <atomic>
#include <cassert>
#include <cstdint>
#include <thread>

int main()
{
  std::atomic<int64_t> shared;
  std::thread thread([&shared]() {
    shared.store(42, std::memory_order_relaxed);
  });
  while (shared.load(std::memory_order_relaxed) != 42) {
  }
  assert(shared.load(std::memory_order_relaxed) == 42);
  thread.join();
  return 0;
}

基于 GCC 10.2 生成的 32 位程序集(-O3https ://godbolt.org/z/8sPs55nzT ):

        // Thread B:
        // shared.store(42, std::memory_order_relaxed);
        movl    $42, %ecx
        xorl    %ebx, %ebx
        subl    $8, %esp
        movl    16(%esp), %eax
        movl    4(%eax), %eax       # function arg: pointer to  shared
        movl    %ecx, (%esp)
        movl    %ebx, 4(%esp)
        movq    (%esp), %xmm0       # 8-byte reload
        movq    %xmm0, (%eax)       # 8-byte store to  shared
        addl    $8, %esp

        // Thread A:
        // while (shared.load(std::memory_order_relaxed) != 42) {
        // }
.L9:                                # do {
        movq    -16(%ebp), %xmm1       # 8-byte load from shared
        movq    %xmm1, -32(%ebp)       # copy to a dummy temporary
        movl    -32(%ebp), %edx
        movl    -28(%ebp), %ecx        # and scalar reload
        movl    %edx, %eax
        movl    %ecx, %edx
        xorl    $42, %eax
        orl     %eax, %edx
        jne     .L9                 # } while(shared.load() ^ 42 != 0);

为了保证加载和存储的原子性,编译器发出一个 8 字节的SSE2movq指令(到/从 128 位 SSE 寄存器的下半部分)。此外,该组件显示,即使volatile被移除,该环仍然完好无损。

通过std::atomic在示例中使用,可以保证

  • std::atomic 加载和存储不受基于寄存器的缓存的影响
  • std::atomic 加载和存储不允许观察部分值

C++ 标准根本没有谈论寄存器,但它确实说:

实现应该使原子存储在合理的时间内对原子负载可见。

虽然这为解释留下了空间,但std::atomic跨迭代缓存负载,例如在我们的示例中触发(没有 volatile 或 atomic)显然是一种违规 - 存储可能永远不会变得可见。当前的编译器甚至不优化一个 block中的原子,例如在同一迭代中进行 2 次访问。

在 x86 上,自然对齐的加载/存储(其中地址是加载/存储大小的倍数)是原子的,最多 8 个字节,无需特殊指令。这就是 GCC 能够使用movq.

atomic<T>T硬件可能不直接支持large ,在这种情况下,编译器可以回退到使用 mutex

某些平台上的大T(例如 2 个寄存器的大小)可能需要原子 RMW 操作(如果编译器不简单地回退到锁定),有时提供的大小比最大的有效纯加载/纯-保证原子的存储。(例如在 x86-64、lock cmpxchg16或 ARM ldrexd/strexd重试循环上)。单指令原子 RMW(如 x86 使用)在内部涉及高速缓存行锁或总线锁。例如,旧版本的clang -m32for x86 将使用8 字节的 pure-load 或 pure-storelock cmpxchg8b来代替。movq

上面提到的第二个方面是什么,是什么std::memory_order_relaxed意思?编译器和 CPU 都可以重新排序内存操作以优化效率。重新排序的主要约束是所有加载和存储必须看起来已经按照代码给定的顺序(程序顺序)执行。因此,在线程间通信的情况下,必须考虑内存顺序以建立所需的顺序,尽管尝试重新排序。可以为std::atomic加载和存储指定所需的内存顺序。std::memory_order_relaxed不强加任何特定命令。

互斥原语强制执行特定的内存顺序(获取 - 释放顺序),以便内存操作保持在锁范围内,并且保证先前锁所有者执行的存储对后续锁所有者可见。因此,使用锁,这里提出的所有方面都可以通过使用锁定工具简单地解决。一旦您摆脱了提供的舒适锁,您就必须注意后果和影响并发正确性的因素。

尽可能明确地说明线程间通信是一个很好的起点,以便编译器知道加载/存储上下文并可以相应地生成代码。只要有可能,最好使用 std::atomic<T>with std::memory_order_relaxed(除非场景需要特定的内存顺序)而不是volatile T(当然还有T)。此外,尽可能不要滚动自己的无锁代码,以降低代码复杂性并最大限度地提高正确率。


推荐阅读