首页 > 解决方案 > 避免 SPSC 队列索引的错误共享

问题描述

让我们想象一个无锁并发 SPSC(单生产者/单消费者)队列。

注意,它cached_tail只能被生产者线程访问,就像cached_head只能被消费者线程访问一样。它们可以被认为是私有线程局部变量,因此它们是不同步的,因此没有定义为原子的。

队列的数据布局如下:

#include <atomic>
#include <cstddef>
#include <thread>

struct spsc_queue
{
    /// ...

    // Producer variables
    alignas(std::hardware_destructive_interference_size) std::atomic<size_t> head; // shared
    size_t cached_tail; // non-shared

    // Consumer variables
    alignas(std::hardware_destructive_interference_size) std::atomic<size_t> tail; // shared
    size_t cached_head; // non-shared

    std::byte padding[std::hardware_destructive_interference_size - sizeof(tail) - sizeof(cached_head)];
};

由于我想避免错误共享,因此我与headL1tail缓存行大小对齐。

push/操作的伪代码实现pop如下:

bool push(const void* elems, size_t n)
{
    size_t h = atomic_load(head, relaxed);

    if (num_remaining_storage(h, cached_tail) < n)
    {
        cached_tail = atomic_load(tail, acquire);

        if (num_remaining_storage(h, cached_tail) < n)
            return false;
    }

    // write from elems

    atomic_store(head, h + n, release);
    return true;
}

bool pop(void* elems, size_t n)
{
    size_t t = atomic_load(tail, relaxed);

    if (num_stored_elements(cached_head, t) < n)
    {
        cached_head = atomic_load(head, acquire);

        if (num_stored_elements(cached_head, t) < n)
            return false;
    }

    // read to elems

    atomic_store(tail, t + n, release);
    return true;
}

void wait_and_push(const void* elems, size_t n)
{
    size_t h = atomic_load(head, relaxed);

    while (num_remaining_storage(h, cached_tail) < n)
        cached_tail = atomic_load(tail, acquire);

    // write from elems

    atomic_store(head, h + n, release);
}

void wait_and_pop(void* elems, size_t n)
{
    size_t t = atomic_load(tail, relaxed);

    while (num_stored_elements(cached_head, t) < n)
        cached_head = atomic_load(head, acquire);

    // write to elems

    atomic_store(tail, t + n, release);
}

在初始化时(此处未列出),所有索引都设置为0. 函数num_remaining_storagenum_stored_elementsconst基于传递的参数和不可变队列容量执行简单计算的函数 - 它们不执行任何原子读取或写入。

现在的问题是:我是否需要对齐cached_tail以及cached_head完全避免错误共享任何索引,或者它是可以的。由于cached_tail生产者是私有的,并且cached_head是消费者私有的,我认为cached_tail可以与head(生产者缓存行)cached_head在同一缓存行中,就像与tail(消费者缓存行)在同一缓存行中一样,不会发生错误共享。

我错过了什么吗?

标签: c++concurrencyatomiclock-freefalse-sharing

解决方案


感谢您提供伪代码 - 它仍然缺少一些细节,但我想我明白了基本的想法。您有一个有界 SPSC 队列,其中索引可以环绕,并且您使用cached_tail变量 inpush检查是否有空闲插槽,因此您可以避免tail从可能无效的缓存行加载(反之亦然pop)。

我建议将headand放在cached_tail彼此旁边(即,在同一缓存行上),tail并且cached_head放在不同的缓存行上。push 总是读取两个变量 -headcached_tail,因此将它们靠近在一起是有意义的。cached_tail只有在没有更多空闲插槽并且我们必须重新加载时才会更新tail

您的代码在细节上有点薄,但似乎还有一些优化空间:

bool push(const void* elems, size_t n)
{
    size_t h = atomic_load(head);

    if (num_remaining_storage(h, cached_tail) < n)
    {
        auto t = atomic_load(tail);
        if (t == cached_tail)
          return false;

       // we only have to update our cached_tail if the reloaded value
       // is different - and in this case it is guaranteed that there
       // is a free slot, so we don't have to perform a recheck.
        cached_tail = t;
    }

    // write from elems

    atomic_store(head, h + n);
    return true;
}

这种方式cached_tail只有在更新时head才会更新,因此这是它们位于同一缓存行上的另一个原因。当然,同样的优化也可以应用pop

这正是我想看一些代码的原因,因为访问模式对于确定哪些变量应该共享一个缓存行和哪些不应该是至关重要的。


推荐阅读