首页 > 解决方案 > Intel-64 和 ia32 原子操作获取-释放语义和 GCC 5+

问题描述

我正在研究我的 Haswell CPU(4/8 核 2.3-3.9ghz i7-4790M)上的 Intel CPU atomic features,并且发现很难构建例如。可靠的 mutex_lock() 和 mutex_unlock() 操作,例如 GCC 手册所建议的:

6.53 x86 特定于事务内存的内存模型扩展

x86 架构支持额外的内存排序标志来标记硬件锁省略的锁临界区。除了原子内在函数的现有内存模型之外,还必须指定这些。

 '__ATOMIC_HLE_ACQUIRE'
 Start lock elision on a lock variable.  Memory model must be
 '__ATOMIC_ACQUIRE' or stronger.
 '__ATOMIC_HLE_RELEASE'
 End lock elision on a lock variable.  Memory model must be
 '__ATOMIC_RELEASE' or stronger.

当锁获取失败时,需要良好的性能来快速中止事务。这可以通过“_mm_pause”来完成

 #include <immintrin.h> // For _mm_pause

 int lockvar;

 /* Acquire lock with lock elision */
 while (__atomic_exchange_n(&lockvar, 1, 
     __ATOMIC_ACQUIRE|__ATOMIC_HLE_ACQUIRE))
     _mm_pause(); /* Abort failed transaction */
 ...
 /* Free lock with lock elision */
 __atomic_store_n(&lockvar, 0, __ATOMIC_RELEASE|__ATOMIC_HLE_RELEASE);

因此,阅读英特尔软件开发人员手册第 3 卷第 8.1 节“锁定原子操作”,特别是第 8.1.4 节“锁定操作对内部处理器缓存的影响”后,我实现了我的测试 mutex_lock() mutex_unlock () 起初喜欢:

... static inline attribute((always_inline,const)) bool ia64_has_clflush(void) { register unsigned int ebx=0; asm volatile ( "MOV $7, %%eax\n\t" "MOV $0, %%ecx\n\t" "CPUID\n\t" "MOV %%ebx, %0\n\t" : "=r" (ebx) : : "%eax", "%ecx", "%ebx" ); return ((ebx & (1U<<23)) ? true : false); }

#define _LD_SEQ_CST_ __ATOMIC_SEQ_CST
#define _ST_SEQ_CST_ __ATOMIC_SEQ_CST
#define _ACQ_SEQ_CST_ (__ATOMIC_SEQ_CST|__ATOMIC_HLE_ACQUIRE)
#define _REL_SEQ_CST_ (__ATOMIC_SEQ_CST|__ATOMIC_HLE_RELEASE)

static bool has_clflush=false;
static
void init_has_clflush(void)
{ has_clflush = ia64_has_clflush();
}
static
void init_has_clflush(void) __attribute__((constructor));

static inline __attribute__((always_inline))
void mutex_lock( register _Atomic int *ua )
{ // the SDM states that memory to be used as semaphores
  // should not be in the WB cache memory, but nearest we
  // can get to uncached memory is to explicitly un-cache it:
  if(has_clflush)
    asm volatile
    ( "CLFLUSHOPT (%0)"
      :: "r" (ua)
    );
    // why isn't the cache flush enough?
    else
      asm volatile
      ( "LFENCE" :: );
      register unsigned int x;
      x = __atomic_sub_fetch( ua, 1, _ACQ_SEQ_CST_);
      _mm_pause();
    if(has_clflush)
      asm volatile
      ( "CLFLUSHOPT (%0)"
       :: "r" (ua)
      );
    else
      asm volatile
      ( "SFENCE" :: );
  while((x = __atomic_load_n(ua,_LD_SEQ_CST_)) != 0)
    switch(syscall( SYS_futex, ua, FUTEX_WAIT, x, nullptr,nullptr,0))
    {case 0:
      break;
     case -1:
      switch( errno )
      { case EINTR:
        case EAGAIN:
         continue;
        default:
         fprintf(stderr,"Unexpected futex error: %d : '%s'.", errno,   
              strerror(errno));
        return;
      }
    }
  }

  static inline __attribute__((always_inline))
  void mutex_unlock( register _Atomic int *ua )
  { if(has_clflush)
      asm volatile
      ( "CLFLUSHOPT (%0)"
      :: "r" (ua)
      );
    else
      asm volatile( "LFENCE" :: );
    register unsigned int x;
    x = __atomic_add_fetch( ua, 1, _REL_SEQ_CST_);
    _mm_pause();
    if(has_clflush)
      asm volatile
      ( "CLFLUSHOPT (%0)"
        :: "r" (ua)
      );
    else
      asm volatile ( "SFENCE" :: );
    if(x == 0)
      while( (1 < syscall( SYS_futex, ua, FUTEX_WAKE, 1,
           nullptr,nullptr,0)) && (errno == EINTR));
  }

现在,有趣的是,关键的 mutex_lock() 减法和 mutex_unlock() 加法操作最终成为指令:

互斥锁:

# 61 "intel_lock1.c" 1
    CLFLUSHOPT (%rbx)
# 0 "" 2
#NO_APP
.L7:
    lock xacquire subl  $1, lck(%rip)
    rep nop
    cmpb    $0, has_clflush(%rip)
    je  .L8
#APP
# 72 "intel_lock1.c" 1
    CLFLUSHOPT (%rbx)
# 0 "" 2

互斥锁:

#APP
# 98 "intel_lock1.c" 1
    CLFLUSHOPT (%rbx)
# 0 "" 2
#NO_APP
.L24:
    movl    $1, %eax
    lock xacquire xaddl %eax, lck(%rip)
    rep nop
    addl    $1, %eax
    cmpb    $0, has_clflush(%rip)
    je  .L25
#APP
# 109 "intel_lock1.c" 1
    CLFLUSHOPT (%rbx)
# 0 "" 2
#NO_APP

但是这个实现似乎需要 LFENCE / SFENCE 可靠地运行(CLFLUSHOPT 是不够的),否则两个线程可能最终在 futex() 中死锁,锁定值相同 -1 。

从阅读英特尔文档中我看不到两个线程进入指令序列是如何发生的:

# %rbx == $lck
CLFLUSHOPT (%rbx)
lock xacquire subl  $1, lck(%rip)
rep nop

如果 *lck 为 0 ,则两者都可以在 *lck 中得到结果 '-1' ;肯定一个线程必须得到 -1 而另一个线程必须得到 -2 吗?

但 strace 说不:

strace: Process 11978 attached with 2 threads
[pid 11979] futex(0x60209c, FUTEX_WAIT, 4294967295, NULL <unfinished ...>
[pid 11978] futex(0x60209c, FUTEX_WAIT, 4294967295, NULL^C

这是僵局的情况。我哪里做错了 ?

请那里的任何英特尔 CPU 锁定和缓存专家解释如何在同一未缓存位置 *lck 的两个原子减量或增量都断言 #LOCK 总线信号(独占总线访问)和 XACQUIRE 最终会在 *lck 中得到相同的结果?

我认为这就是#LOCK 前缀(和 HLE)的目的?我已经尝试不使用 HLE 并且只使用 __ATOMIC_SEQ_CST 进行所有访问(这只是添加了 LOCK 前缀,而不是 XACQUIRE)但它没有区别 - 没有 {L,S}FENCE-es 仍然会导致死锁。

我已经阅读了 Ulrich Drepper 的优秀论文 [Futexes are Tricky]:http ://www.akkadia.org/drepper/futex.pdf ,但他提出了一个仅将硬编码常量写入锁定内存的互斥锁实现。我明白为什么了。很难让互斥锁在服务员计数或对锁定值上进行的任何算术运算中可靠地工作。有没有人找到方法来做可靠的锁定算法,使得结果适合 x86_64 Linux 上的锁定/信号量值?最有兴趣讨论它们...

因此,在调查了 HLE 和 CLFLUSH 的一些死胡同之后,我能够到达的唯一工作版本的锁定/解锁使用硬编码常量和 __atomic_compare_exchange_n - 测试程序的完整源代码,它增加了一个计数器(没有锁定) 直到收到 + / 退出信号,位于:

工作示例:intel_lock3.c

[]:https ://drive.google.com/open?id=1ElB0qmwcDMxy9NBYkSXVxljj5djITYxa

enum LockStatus
{ LOCKED_ONE_WAITER = -1
, LOCKED_NO_WAITERS = 0
, UNLOCKED=1
};

static inline __attribute__((always_inline))
bool mutex_lock( register _Atomic int *ua )
{ register int x;
  int cx;
 lock_superceded:
  x  = __atomic_load_n( ua, _LD_SEQ_CST_ );
  cx = x;
  x = (x == UNLOCKED)
       ? LOCKED_NO_WAITERS
       : LOCKED_ONE_WAITER;
  if (! __atomic_compare_exchange_n
      ( ua, &cx, x, false, _ACQ_SEQ_CST_,  _ACQ_SEQ_CST_) )
    goto lock_superceded;
  if( x == LOCKED_ONE_WAITER )
  { do{
    switch(syscall( SYS_futex, ua, FUTEX_WAIT, x, nullptr,nullptr,0))
    {case 0:
      break;
     case -1:
      switch( errno )
      { case EINTR:
         return false;
        case EAGAIN:
          break;
        default:
          fprintf(stderr,"Unexpected futex WAIT error: %d : '%s'.",
                  errno, strerror(errno));
          return false;
       }
    }
    x = __atomic_load_n(ua,_LD_SEQ_CST_);
    } while(x < 0);
  }
  return true;
}

static inline __attribute__((always_inline))
bool mutex_unlock( register _Atomic int *ua )
{ register int x;
  int cx;
 unlock_superceded:
  x  = __atomic_load_n( ua, _LD_SEQ_CST_ );
  cx = x;
  x = (x == LOCKED_ONE_WAITER)
       ? LOCKED_NO_WAITERS
       : UNLOCKED;
  if (! __atomic_compare_exchange_n
       ( ua, &cx, x, false, _ACQ_SEQ_CST_,  _ACQ_SEQ_CST_) )
    goto unlock_superceded;
    if(x == LOCKED_NO_WAITERS)
    { while((1 < 
             syscall( SYS_futex, ua, FUTEX_WAKE, 1, nullptr,nullptr,0))
         ||( UNLOCKED != __atomic_load_n( ua, _LD_SEQ_CST_ ))
         ) // we were a waiter, so wait for locker to unlock !
      { if( errno != 0 )
          switch(errno)
          {case EINTR:
            return false;
           case EAGAIN:
            break;
           default:
            fprintf(stderr,
                  "Unexpected futex WAKE error: %d : '%s'.", 
                  errno, strerror(errno));
            return false;
          }
      }
   }
   return true;
 }

 Build & Test (GCC 7.3.1 & 6.4.1 & 5.4.0) used:
 $ gcc -std=gnu11 -march=x86-64 -mtune=native -D_REENTRANT \
   -pthread -Wall -Wextra -O3 -o intel_lock3 intel_lock3.c

 $ ./intel_lock3
 # wait a couple of seconds and press ^C
 ^C59362558

使用算术破解的版本:

https://drive.google.com/open?id=10yLrohdKLZT4p3G1icFHdjF5eHY68Yws

编译例如:

$ gcc -std=gnu11 -march=x86_64 -mtune=native -O3 -Wall -Wextra 
  -o intel_lock2 intel_lock2.c
$ ./intel_lock2
# wait a couple of seconds and press ^C
$ ./intel_lock2
^Cwas locked!
446

它不应该打印“被锁定!” 并且在几秒钟内应该超过了@ 5e8 : 5x10^8 的计数,而不是 446。

使用 strace 运行显示有两个线程正在阻塞等待 -1 的锁定值变为 0 :

$ strace -f -e trace=futex ./intel_lock2
strace: Process 14481 attached
[pid 14480] futex(0x602098, FUTEX_WAIT, 4294967295, NULL <unfinished ...>
[pid 14481] futex(0x602098, FUTEX_WAKE, 1 <unfinished ...>
[pid 14480] <... futex resumed> )       = -1 EAGAIN (Resource temporarily
                                          unavailable)
[pid 14481] <... futex resumed> )       = 0
[pid 14480] futex(0x602098, FUTEX_WAKE, 1 <unfinished ...>
[pid 14481] futex(0x602098, FUTEX_WAIT, 4294967295, NULL <unfinished ...>
[pid 14480] <... futex resumed> )       = 0
[pid 14481] <... futex resumed> )       = -1 EAGAIN (Resource temporarily
                                          unavailable)
[pid 14480] futex(0x602098, FUTEX_WAIT, 4294967295, NULL <unfinished ...>
[pid 14481] futex(0x602098, FUTEX_WAIT, 4294967295, NULL^C <unfinished  
...>
[pid 14480] <... futex resumed> )       = ? ERESTARTSYS (To be restarted 
if SA_RESTART is set)
strace: Process 14480 detached
strace: Process 14481 detached
was locked!
7086

$

通常, WAIT 应该在 WAKE 之前安排,但不知何故 GCC 将内存排序语义解释为意味着 WAKE 总是在任何 WAIT 之前被安排;但即使发生这种情况,代码也应该会被延迟,并且永远不会导致两个线程在进入 futex(...FUTEX_WAIT..) 时获得 -1 lck 值。

当两个线程都获得 (-1,-1) 时,几乎相同的算法在锁定值上使用算术总是死锁 - 请注意,任何线程都不会看到 -2 值:

static inline __attribute__((always_inline))
bool mutex_lock( register _Atomic volatile int *ua )
{ register int x;
  x = __atomic_add_fetch( ua, -1, _ACQ_SEQ_);
  if( x < 0 )
  { do{
    // here you can put:
    // if( x == -2) { .. NEVER REACHED! }
    switch(syscall( SYS_futex, ua, FUTEX_WAIT, x, nullptr,nullptr,0))
    {case 0:
      break;
     case -1:
      switch( errno )
      { case EINTR:
         return false; // interrupted - user wants to exit?
        case EAGAIN:
          break;
        default:
          fprintf(stderr,"Unexpected futex WAIT error: %d : '%s'.",
                  errno, strerror(errno));
          return false;
       }
    }
    x = __atomic_load_n(ua,_LD_SEQ_);
    } while(x < 0);
  }
  return true;
}

static inline __attribute__((always_inline))
bool mutex_unlock( register _Atomic volatile int *ua )
{ register int x;
  x = __atomic_add_fetch( ua, 1, _REL_SEQ_);
  if(x == 0) // there was ONE waiter
     while(  (1 < 
             syscall( SYS_futex, ua, FUTEX_WAKE, 1, nullptr,nullptr,0)
             )
           ||(1 < __atomic_load_n(ua, _LD_SEQ_)
             ) // wait for first locker to unlock
           ) 
     { if( errno != 0 )
         switch(errno)
         {case EINTR:
           return false;
          case EAGAIN:
           break;
          default:
           fprintf(stderr,"Unexpected futex WAKE error: %d : '%s'.", 
                  errno, strerror(errno));
           return false;
         }
       }
     return true;
   }

所以,我认为如果算术运算按预期工作,即。被序列化和原子化,那么上面的代码就不会死锁;算术应该生成与工作示例中使用的 LockStatus 枚举值相同的数字。

但是算术出了点问题,现在产生了指令:

互斥锁:

movl    $-1, %eax
lock xaddl  %eax, (%rdx)

互斥锁:

movl    $1, %eax
lock xaddl  %eax, (%rdx)

代码本身没有插入栅栏,但每个 __atomic_store_n(ua,...) 都会生成一个 .

AFAICS,没有导致两个线程获得相同 -1 值的代码的有效时间表。

所以我的结论是,在算术指令上使用 intel LOCK 前缀是不安全的,并且会在用户模式 ​​Linux x86_64 gcc 编译程序中引入错误行为 - 仅将常量值从文本内存写入数据内存是原子的,并且在 Intel Haswell i7 上按顺序排序-4790M 具有 gcc 和 Linux 的平台,并且这些平台上的算术不能通过使用 HLE / XACQUIRE、锁定前缀或 FENCE 指令的任何组合来进行原子和顺序排序。

我的预感是分支预测以某种方式失败并添加了额外的算术运算/无法在此平台上执行算术运算,并在不同的物理内核上声明了 LOCK 前缀和多个线程。因此,所有带有 LOCK 前缀的算术运算都是可疑的,应该避免。

标签: linuxgccx86-64atomicfutex

解决方案


lock subl $1, (%rdi)或者在所有情况下lock xaddl %eax, (%rdx)都是 100% 原子的,即使指针未对齐(但在这种情况下要慢得多),并且是完整的内存屏障。在可缓存内存上,不会有任何外部#LOCK总线信号;内部实现只是将高速缓存行锁定在运行locked 指令的内核内的 MESI 的 M 状态。请参阅“int num”的 num++ 是否是原子的?更多细节。

如果您的测试发现它不是原子的,那么您的硬件已损坏或您的测试已损坏。发现死锁告诉您设计中存在错误,而不是您的原子原始构建块不是原子的。您可以通过使用两个线程来增加一个共享计数器来非常轻松地测试原子增量,并注意不会丢失任何计数。与不使用 的addl $1, shared(%rip)情况不同lock,您会看到丢失的计数。

此外,lfencesfencepause在正常情况下对正确性没有影响(没有 NT 存储,并且仅使用 WB(回写)内存)。如果您的任何 fence / clflush 东西有帮助,那只是在某处添加额外的延迟,这可能会使该线程在您的测试中总是输掉比赛,而不是实际上使其安全。 mfence是唯一重要的栅栏,阻止 StoreLoad 重新排序和存储转发效果。(这就是为什么 gcc 使用它作为实现 seq-cst 存储的一部分)。

在您考虑搞乱 HLE / 事务内存之前,获得一个可以正常工作的基本版本。


获取锁的第一个版本中的竞争条件

x = __atomic_sub_fetch( ua, 1, _ACQ_SEQ_CST_); 是原子的,只有一个线程lock sub可以ua0to-1和 get x=-1 from there改变。

但是你没有使用sub_fetch结果,你正在做另一个负载
while((x = __atomic_load_n(ua,_LD_SEQ_CST_)) != 0)

因此,另一个线程可以查看ua=-1第一个线程是否锁定,然后lock sub在第二个线程中的负载之间解锁。

调用它的原因sub_fetch是它原子地返回旧值,以及原子地修改内存中的值。您丢弃结果的事实sub_fetch是它可以编译为lock sub的原因,而不是lock xadd使用寄存器保存-1

(或者智能编译器可以将其编译为lock sub并检查 ZF,因为您可以从设置的标志中判断值何时变为非零或负数lock sub。)


请参阅C 和低级信号量实现,了解没有回退到操作系统辅助睡眠/唤醒的简单信号量。它在负载上旋转,直到我们看到大于 0 的值,然后尝试使用 C11 获取锁fetch_add(-1)

但是,如果它在与另一个线程的竞争中输了,它就会撤消减量。

这可能是一个糟糕的设计;最好尝试使用 a 进行减量lock cmpxchg,因此失败的线程不必撤消其减量。


我没有使用过 HLE,但我认为这个 bug 也会破坏你的 HLE 锁定。

您不需要 SFENCE、LFENCE 或 CLFLUSH[OPT] 或任何东西。 lock xadd在任何内存类型(包括 WB)上,它本身已经是一个完整的内存屏障和 100% 原子。

如果您认为 SDM 说您应该避免 WB 内存用于互斥锁/信号量,那么您可能误读了 SDM。


您在唤醒期间还有一个可能导致死锁的竞赛窗口

此代码mutex_lock看起来已损坏/容易竞争

x = __atomic_sub_fetch( ua, 1, _ACQ_SEQ_CST_);  // ok, fine
_mm_pause();   // you don't want a pause on the fast path.

if( x < 0 )   // just make this a while(x<0) loop
do {
   futex(..., FUTEX_WAIT, ...);

   x = __atomic_load_n(ua,_LD_SEQ_CST_);        // races with lock sub in other threads.
} while(x < 0);

给定线程 Afutexlck == -1(如果可能的话?):

  • 线程 B 解锁,导致lck == 0,并调用 futex(FUTEX_WAKE)
  • 线程 A 唤醒,futex 返回 whilelck仍为 0
  • 其他一些线程(B 或第三个线程)进入mutex_lock并运行__atomic_sub_fetch( ua, 1, _ACQ_SEQ_CST_);,离开lck == -1
  • 线程 Ax = __atomic_load_n(ua,_LD_SEQ_CST_);在其循环底部运行并看到-1

现在您有 2 个线程卡在 futex 等待循环中,实际上没有线程获得互斥锁/进入临界区。


我认为如果你的设计依赖于在 futex 返回后进行加载,那么你的设计就会被破坏

手册页futex(2)示例fwait()显示它在返回后futex返回,无需再次加载。

futex()是一个原子比较和阻塞操作。您的设计将您的计数器值更改为-1如果一个线程正在等待锁定而第三个线程尝试获取它。因此,您的设计可能适用于 2 个线程,但不适用于 3 个线程。

使用原子 CAS 进行递减可能是个好主意,因此您实际上永远不会更改lck-1或降低,并且futex可以保持阻塞状态。

然后,如果您可以指望它只唤醒 1,那么您是否也可以相信它的返回值意味着您确实拥有锁而没有容易竞争的单独负载。我认为。


推荐阅读