c++ - 原子清除无符号整数的最低非零位
问题描述
问题:
我正在寻找以线程安全的方式清除无符号原子的最低非零位的最佳方法,std::atomic_uint64_t
而无需使用额外的互斥锁等。此外,我还需要知道,哪个位被清除了。
示例:
假设存储的当前值是0b0110
我想知道最低的非零位是位 1(0 索引)并将变量设置为0b0100
.
我想出的最好的版本是这样的:
#include <atomic>
#include <cstdint>
inline uint64_t with_lowest_non_zero_cleared(std::uint64_t v){
return v-1 & v;
}
inline uint64_t only_keep_lowest_non_zero(std::uint64_t v){
return v & ~with_lowest_non_zero_cleared(v);
}
uint64_t pop_lowest_non_zero(std::atomic_uint64_t& foo)
{
auto expected = foo.load(std::memory_order_relaxed);
while(!foo.compare_exchange_weak(expected,with_lowest_non_zero_cleared(expected), std::memory_order_seq_cst, std::memory_order_relaxed))
{;}
return only_keep_lowest_non_zero(expected);
}
有更好的解决方案吗?
笔记:
- 它不一定是最低的非零位 - 如果有区别,我也会对清除最高位或设置最高/最低“零位”的解决方案感到满意
- 我非常喜欢标准的 c++ (17) 版本,但会接受使用 clang 和 msvc 的内在函数的答案
- 它应该是可移植的(为 x64 和 AArch64 使用 clang 或 msvc 编译),但我最感兴趣的是使用 clang 编译时最近的 intel 和 AMD x64 架构的性能。
- 编辑:更新必须是原子的,并且必须保证全局进度,但就像上面的解决方案一样,它不必是无等待算法(当然,如果你能告诉我一个,我会很高兴)。
解决方案
x86 上没有对 atomic clear-lowest-bit 的直接硬件支持。 BMI1blsr
仅提供内存源形式,而不是内存目标形式;lock blsr [shared_var]
不存在。(编译器知道如何在您编译或启用假设 BMI1 支持的代码生成时针对本地变量进行优化(var-1) & (var)
。blsr
)-march=haswell
因此,即使您可以假设 BMI1 支持1,它也不会让您做任何根本不同的事情。
您可以在 x86 上做的最好的事情是lock cmpxchg
您在问题中提出的重试循环。这比在旧版本的变量中找到正确的位然后使用更好的选择lock btr
,特别是如果在位扫描和lock btr
. 如果另一个线程已经清除了你想要的位,你仍然需要一个重试循环。
CAS 重试循环在实践中也不错:重试非常罕见
(除非您的程序对共享变量的争用非常高,即使lock add
在没有尝试的情况下也会对性能产生问题,只是硬件仲裁访问缓存行。如果是这种情况,您可能应该重新设计无锁算法和/或考虑某种操作系统辅助的睡眠/唤醒,而不是让大量内核花费大量 CPU 时间锤击同一高速缓存行。无锁在低争用情况下非常好。)
expected
CPU 在加载和运行之间丢失高速缓存行的窗口lock cmpxchg
很小,因为该值的几条指令的结果。 通常它会在第一次通过时成功,因为运行时缓存行仍然存在于 L1d 缓存中cmpxchg
。当高速缓存行到达时,它(希望)已经处于 MESI 独占状态,如果 CPU 看到足够远的距离来为它执行 RFO。
您可以检测您的 cmpxchg 重试循环,以查看您在实际程序中实际获得了多少争用。(例如,通过增加循环内的局部变量,并在成功后使用原子松弛+=
到共享计数器中,或者使用线程局部计数器。)
请记住,您的真实代码(希望)在此位掩码上的原子操作之间做了大量工作,因此它与所有线程都将所有时间都花在该缓存行上的微基准测试非常不同。
编辑:更新必须是原子的,并且必须保证全局进度,但就像上面的解决方案一样,它不一定是无等待算法(当然,如果你能告诉我一个,我会很高兴)。
CAS 重试循环(即使在 LL/SC 机器上编译,见下文)在技术意义上是无锁的:只有在其他线程取得进展时才需要重试。
如果 CAS 失败,则不修改高速缓存行。在 x86 上,它会弄脏高速缓存行(MESI M 状态),但 x86 cmpxchg 不会检测到 ABA,它只会进行比较,因此另一个加载相同的线程expected
将成功。在 LL/SC 机器上,希望一个核心上的 SC 故障不会导致其他核心上出现奇怪的 SC 故障,否则它可能会活锁。你可以假设 CPU 架构师会想到这一点。
它不是无等待的,因为理论上单个线程必须重试无限次。
您的代码编译gcc8.1 -O3 -march=haswell
为这个 asm(来自 Godbolt 编译器资源管理器)
# gcc's code-gen for x86 with BMI1 looks optimal to me. No wasted instructions!
# presumably you'll get something similar when inlining.
pop_lowest_non_zero(std::atomic<unsigned long>&):
mov rax, QWORD PTR [rdi]
.L2:
blsr rdx, rax # Bit Lowest Set Reset
lock cmpxchg QWORD PTR [rdi], rdx
jne .L2 # fall through on success: cmpxchg sets ZF on equal like regular cmp
blsi rax, rax # Bit Lowest Set Isolate
ret
如果没有 BMI1,blsr 和 blsi 将分别变成两条指令。lock
考虑到ed 指令的成本,这与性能几乎无关。
不幸的是,clang 和 MSVC 稍微笨拙一些,在无争用快速路径上有一个分支。(并且clang通过剥离第一次迭代使函数膨胀。IDK如果这有助于分支预测或其他什么,或者如果它纯粹是一个错过的优化。我们可以让clang使用unlikely()
宏生成没有采用分支的快速路径。 如何Linux 内核中的可能()和不太可能()宏工作,它们有什么好处?)。
脚注1:
除非您正在为一组已知的机器构建二进制文件,否则您不能假设 BMI1 无论如何都是可用的。所以编译器需要做类似lea rdx, [rax-1]
/and rdx, rax
而不是blsr rdx, rax
. 这对这个函数有一个微不足道的区别;大部分成本是原子操作,即使在无竞争的情况下,即使对于 hot-in-L1d 缓存无争用情况,看看乱序执行吞吐量对周围代码的影响。(例如,lock cmpxchg
在 Skylake ( http://agner.org/optimize/ ) 上 10 uop 与保存 1 uopblsr
而不是其他2条指令。假设前端是瓶颈,而不是内存或其他东西。成为一个完整的内存屏障也会影响周围代码中的加载/存储,但幸运的是不会影响独立 ALU 指令的乱序执行。)
非 x86:LL/SC 机器
大多数非 x86 机器使用加载链接/存储条件重试循环来执行所有原子操作。不幸的是,C++11 不允许您创建自定义 LL/SC 操作(例如,(x-1) & x
在 LL/SC 内部,而不仅仅是您从 using 中获得的添加fetch_add
),但是 CAS 机器(如 x86)无法为您提供 LL/SC 提供的 ABA 检测。因此,尚不清楚您将如何设计一个可以在 x86 上高效编译但也可以直接编译到 ARM 和其他 LL/SC ISA 上的 LL/SC 重试循环的 C++ 类。(请参阅此讨论。)
因此,您只需要使用compare_exchange_weak
if C++ 不提供您想要的操作(例如fetch_or
or fetch_and
)来编写代码。
您从当前的编译器中获得的实践是compare_exchange_weak
使用 LL/SC 实现的,并且您的算术运算与此分开。asm 实际上在独占加载获取 ( ldaxr
) 之前执行了宽松的加载,而不是仅仅基于ldaxr
结果进行计算。在尝试存储之前,还有额外的分支来检查expected
最后一次尝试是否与新的加载结果匹配。
LL/SC 窗口可能比加载和存储之间的 2 个相关 ALU 指令短,以防万一。CPU 提前准备好所需的值,而不依赖于加载结果。(这取决于之前的加载结果。)Clang 的代码生成将sub
/and
放入循环中,但取决于之前迭代的加载,因此在乱序执行的情况下,它们仍然可以提前准备好。
但如果这真的是最有效的做事方式,编译器也应该使用它fetch_add
。他们不这样做,因为它可能不是。LL/SC 重试很少见,就像 x86 上的 CAS 重试一样。我不知道它是否会在竞争激烈的情况下有所不同。也许吧,但是编译器在编译时不会减慢优化的快速路径fetch_add
。
我重命名了您的函数并重新格式化了while()
可读性,因为它对于一行来说已经太长而没有用unlikely()
.
这个版本使用 clang 编译成可能比你的原始版本稍微好一点的 asm。我还修复了您的函数名称,因此它实际上可以编译;你的问题不匹配。我选择了与 x86 的 BMI 指令名称类似的完全不同的名称,因为它们简洁地描述了操作。
#include <atomic>
#include <cstdint>
#ifdef __GNUC__
#define unlikely(expr) __builtin_expect(!!(expr), 0)
#define likely(expr) __builtin_expect(!!(expr), 1)
#else
#define unlikely(expr) (expr)
#define likely(expr) (expr)
#endif
inline uint64_t clear_lowest_set(std::uint64_t v){
return v-1 & v;
}
inline uint64_t isolate_lowest_set(std::uint64_t v){
//return v & ~clear_lowest_set(v);
return (-v) & v;
// MSVC optimizes this better for ARM when used separately.
// The other way (in terms of clear_lowest_set) does still CSE to 2 instructions
// when the clear_lowest_set result is already available
}
uint64_t pop_lowest_non_zero(std::atomic_uint64_t& foo)
{
auto expected = foo.load(std::memory_order_relaxed);
while(unlikely(!foo.compare_exchange_weak(
expected, clear_lowest_set(expected),
std::memory_order_seq_cst, std::memory_order_relaxed)))
{}
return isolate_lowest_set(expected);
}
AArch64 的Clang -O3
(-target aarch64-linux-android -stdlib=libc++ -mcpu=cortex-a57
在 Godbolt 上)制作了一些奇怪的代码。这是来自 clang6.0,但旧版本也有奇怪之处,在寄存器中创建一个 0/1 整数,然后对其进行测试,而不是一开始就跳到正确的位置。
pop_lowest_non_zero(std::__1::atomic<unsigned long long>&): // @pop_lowest_non_zero(std::__1::atomic<unsigned long long>&)
ldr x9, [x0] @ the relaxed load
ldaxr x8, [x0] @ the CAS load (a=acquire, x=exclusive: pairs with a later stxr)
cmp x8, x9 @ compare part of the CAS
b.ne .LBB0_3
sub x10, x9, #1
and x10, x10, x9 @ clear_lowest( relaxed load result)
stlxr w11, x10, [x0] @ the CAS store (sequential-release)
cbnz w11, .LBB0_4 @ if(SC failed) goto retry loop
# else fall through and eventually reach the epilogue
# this looks insane. w10 = 0|1, then branch if bit[0] != 0. Always taken, right?
orr w10, wzr, #0x1
tbnz w10, #0, .LBB0_5 @ test-bit not-zero will always jump to the epilogue
b .LBB0_6 # never reached
.LBB0_3:
clrex @ clear the ldrx/stxr transaction
.LBB0_4:
# This block is pointless; just should b to .LBB0_6
mov w10, wzr
tbz w10, #0, .LBB0_6 # go to the retry loop, below the ret (not shown here)
.LBB0_5: # isolate_lowest_set and return
neg x8, x9
and x0, x9, x8
ret
.LBB0_6:
@ the retry loop. Only reached if the compare or SC failed.
...
所有这些疯狂的分支可能不是真正的性能问题,但很明显这可能会更有效率(即使没有教 clang 如何直接使用 LL/SC 而不是模拟的 CAS)。 报告为 clang/LLVM 错误 38173]( https://bugs.llvm.org/show_bug.cgi?id=38173 )
似乎 MSVC 不知道 AArch64 的发布存储是顺序发布的(不仅仅是像 x86 这样的常规发布),因为它dmb ish
在stlxr
. 它浪费的指令更少,但它的 x86 偏向正在显示或其他东西:像使用可能无用的重试循环compare_exchange_weak
一样编译,该循环将在 LL/SC 失败时再次尝试使用旧的预期/期望的 CAS。compare_exchange_strong
这通常是因为另一个线程修改了该行,所以预期会不匹配。(Godbolt 在任何旧版本中都没有用于 AArch64 的 MSVC,所以也许支持是全新的。这可以解释dmb
)
## MSVC Pre 2018 -Ox
|pop_lowest_non_zero| PROC
ldr x10,[x0] # x10 = expected = foo.load(relaxed)
|$LL2@pop_lowest| @ L2 # top of the while() loop
sub x8,x10,#1
and x11,x8,x10 # clear_lowest(relaxed load result)
|$LN76@pop_lowest| @ LN76
ldaxr x8,[x0]
cmp x8,x10 # the compare part of CAS
bne |$LN77@pop_lowest|
stlxr w9,x11,[x0]
cbnz w9,|$LN76@pop_lowest| # retry just the CAS on LL/SC fail; this looks like compare_exchange_strong
# fall through on LL/SC success
|$LN77@pop_lowest| @ LN77
dmb ish # full memory barrier: slow
cmp x8,x10 # compare again, because apparently MSVC wants to share the `dmb` instruction between the CAS-fail and CAS-success paths.
beq |$LN75@pop_lowest| # if(expected matches) goto epilogue
mov x10,x8 # else update expected
b |$LL2@pop_lowest| # and goto the top of the while loop
|$LN75@pop_lowest| @ LN75 # function epilogue
neg x8,x10
and x0,x8,x10
ret
AArch64 的 gcc6.3 也会生成奇怪的代码,并将其存储expected
到堆栈中。(Godbolt 没有更新的 AArch64 gcc)。
我对 AArch64 编译器对此非常不满意!IDK 为什么他们不生成像这样干净高效的东西,在快速路径上没有采用分支,并且只需要设置一些离线代码以跳回 CAS 重试。
pop_lowest_non_zero ## hand written based on clang
# clang could emit this even without turning CAS into LL/SC directly
ldr x9, [x0] @ x9 = expected = foo.load(relaxed)
.Lcas_retry:
ldaxr x8, [x0] @ x8 = the CAS load (a=acquire, x=exclusive: pairs with a later stxr)
cmp x8, x9 @ compare part of the CAS
b.ne .Lcas_fail
sub x10, x9, #1
and x10, x10, x9 @ clear_lowest (relaxed load result)
stlxr w11, x10, [x0] @ the CAS store (sequential-release)
cbnz w11, .Lllsc_fail
# LL/SC success: isolate_lowest_set and return
.Lepilogue:
neg x8, x9
and x0, x9, x8
ret
.Lcas_fail:
clrex @ clear the ldrx/stxr transaction
.Lllsc_fail:
mov x9, x8 @ update expected
b .Lcas_retry @ instead of duplicating the loop, jump back to the existing one with x9 = expected
比较.fetch_add
:
Clang 确实为fetch_add()
:
mov x8, x0
.LBB1_1:
ldxr x0, [x8] # relaxed exclusive load: I used mo_release
add x9, x0, #1
stlxr w10, x9, [x8]
cbnz w10, .LBB1_1 # retry if LL/SC failed
ret
而不是add #1
,我们想要sub x9, x8, #1
/ and x9, x9, x8
,所以我们只得到一个 LL/SC 重试循环。这节省了代码大小,但不会明显更快。在大多数情况下,可能甚至不会更快,尤其是作为整个程序的一部分,它没有大量使用。
替代方案:你甚至需要这个位图操作吗?你能分解它以减少争用吗?
您可以使用原子计数器而不是位图,并在需要时将其映射到位图吗?需要位图的操作可以将计数器映射到位图uint64_t(~0ULL) << (64-counter)
(仅适用于非零计数器)。或者也许tmp=1ULL << counter; tmp ^= tmp-1;
(即x86 xor eax,eax
/ bts rax, rdi
(rax = 1在位置0..63设置位)/ blsmsk rax, rax
(rax =设置到该位置的所有位)。嗯,仍然需要掩码= 0的特殊情况,因为有65个可能连续位掩码的状态:64 个位置之一的最高(或最低)位,或根本没有设置位。
或者,如果位图有某种模式,x86 BMI2pdep
可以将连续位分散到该模式中。使用 , 再次获取 N 个连续设置位(1ULL << counter) - 1
,计数器 < 64。
或者它必须是一个位掩码,但不需要是一个位掩码?
不知道你的用例是什么,但这种想法可能有用:
您是否需要所有线程都必须竞争的单个原子位图?也许你可以把它分成多个块,每个块都在一个单独的缓存行中。(但这使得不可能以原子方式对整个映射进行快照。)不过,如果每个线程都有一个首选块,并且总是在继续寻找另一个块中的插槽之前尝试这样做,那么您可能会减少平均情况下的争用。
在 asm 中(或在 C++ 中使用可怕的联合黑客攻击),您可以尝试通过查找要更新的 64 位整数的正确字节,然后仅lock cmpxchg
在其上更新,来减少 cmpxchg 失败而不减少争用。这实际上对这种情况没有帮助,因为看到相同整数的两个线程都会尝试清除相同的位。但它可以减少此操作与设置 qword 中其他位的操作之间的重试次数。当然,这只有在 qword 的其他字节发生变化时成功不是正确性问题时才有效。lock cmpxchg
推荐阅读
- android - 为什么我的片段没有附加到我的活动中?
- css - 使用 CSS 创建形状
- vuejs2 - CKEditor Decoupled Build Editor Area 就在 Toolbar 旁边
- python - 调用 filedialog.askdirectory() 后保持 Toplevel 高于 root
- ios - App Store Connect API/fastlane:审核开始后如何“开发者拒绝”提交
- ios - 面向儿童的 Google Ads 类别 Ios
- python - 在新目录中下载文件 - 套接字编程
- reactjs - 如何约束基于 JSX 的函数的返回类型?
- function - Kotlin - 如何仅在接收器存在时才使用接收器调用可为空的方法?
- python - 在工作人员上存储对象并执行方法