首页 > 解决方案 > 单个作者 + 阅读器的发布/检查更新类是否可以使用 memory_order_relaxed 或获取/释放以提高效率?

问题描述

介绍

我有一个小班,它利用 std::atomic 进行无锁操作。由于这个类被大量调用,它影响了性能,我遇到了麻烦。

类描述

该类类似于 LIFO,但一旦调用 pop() 函数,它只返回其环形缓冲区的最后写入元素(仅当自上次 pop() 以来有新元素时)。

一个线程调用push(),另一个线程调用pop()。

我读过的来源

由于这占用了我太多的计算机时间,我决定进一步研究 std::atomic 类及其 memory_order。我已经阅读了 StackOverflow 和其他资源和书籍中的大量 memory_order 帖子,但我无法清楚地了解不同的模式。特别是,我在获取和释放模式之间苦苦挣扎:我也无法理解为什么它们与 memory_order_seq_cst 不同。

根据我自己的研究,我认为每个记忆顺序用我的话做了什么

memory_order_relaxed:在同一个线程中,原子操作是即时的,但其他线程可能无法立即看到最新的值,它们需要一段时间才能更新。代码可以由编译器或操作系统自由地重新排序。

memory_order_acquire / release:由 atomic::load 使用。它防止在此之前的代码行重新排序(编译器/操作系统可能会在此行之后重新排序),并在此线程或另一个线程中使用memory_order_releasememory_order_seq_cst读取存储在此原子上的最新值。memory_order_release还可以在重新排序后阻止该代码。因此,在获取/释放中,两者之间的所有代码都可以由操作系统改组。我不确定这是在同一个线程之间,还是在不同线程之间。

memory_order_seq_cst:最容易使用,因为它就像我们与变量一起使用的自然写法,立即刷新其他线程加载函数的值。

LockFreeEx 类

template<typename T>
class LockFreeEx
{
public:
    void push(const T& element)
    {
        const int wPos = m_position.load(std::memory_order_seq_cst);
        const int nextPos = getNextPos(wPos);
        m_buffer[nextPos] = element;
        m_position.store(nextPos, std::memory_order_seq_cst);
    }

    const bool pop(T& returnedElement)
    {

        const int wPos = m_position.exchange(-1, std::memory_order_seq_cst);
        if (wPos != -1)
        {
            returnedElement = m_buffer[wPos]; 
            return true;
        }
        else
        {
            return false;
        }
    }

private:
    static constexpr int maxElements = 8;
    static constexpr int getNextPos(int pos) noexcept {return (++pos == maxElements)? 0 : pos;}
    std::array<T, maxElements> m_buffer;
    std::atomic<int> m_position {-1};
};

我如何期望它可以改进

所以,我的第一个想法是在所有原子操作中使用 memory_order_relaxed,因为 pop() 线程处于循环中,每 10-15 毫秒在 pop 函数中寻找可用更新,那么它允许在第一个 pop() 函数中失败以实现后来有一个新的更新。这只是一堆毫秒。

另一种选择是使用发布/获取 - 但我不确定它们。在所有store()中使用 release并在所有load() 函数中使用。

不幸的是,我描述的所有 memory_order 似乎都有效,如果它们应该失败,我不确定它们何时会失败。

最终的

拜托,你能告诉我你是否发现在这里使用宽松的记忆顺序有问题吗?或者我应该使用发布/获取(也许对这些的进一步解释可以帮助我)?为什么?

我认为放松对这个类来说是最好的,在它的所有 store() 或 load() 中。但我不确定!

谢谢阅读。

编辑:额外说明:

由于我看到每个人都在要求'char',所以我将其更改为 int,问题解决了!但这不是我要解决的问题。

正如我之前所说,这个类很可能是 LIFO,但只有最后一个被推送的元素才重要,如果有的话。

我有一个大结构 T(可复制和可分配),我必须以无锁方式在两个线程之间共享。所以,我知道的唯一方法是使用一个循环缓冲区来写入 T 的最后一个已知值,以及一个知道最后写入值的索引的原子。如果没有,索引将为-1。

请注意,我的推送线程必须知道何时有“新 T”可用,这就是 pop() 返回布尔值的原因。

再次感谢所有试图帮助我处理记忆订单的人!:)

阅读解决方案后:

template<typename T>
class LockFreeEx
{
public:
    LockFreeEx() {}
    LockFreeEx(const T& initValue): m_data(initValue) {}

    // WRITE THREAD - CAN BE SLOW, WILL BE CALLED EACH 500-800ms
    void publish(const T& element)
    {
        // I used acquire instead relaxed to makesure wPos is always the lastest w_writePos value, and nextPos calculates the right one
        const int wPos = m_writePos.load(std::memory_order_acquire);
        const int nextPos = (wPos + 1) % bufferMaxSize;
        m_buffer[nextPos] = element;
        m_writePos.store(nextPos, std::memory_order_release);
    }


    // READ THREAD - NEED TO BE VERY FAST - CALLED ONCE AT THE BEGGINING OF THE LOOP each 2ms
    inline void update() 
    {
        // should I change to relaxed? It doesn't matter I don't get the new value or the old one, since I will call this function again very soon, and again, and again...
        const int writeIndex = m_writePos.load(std::memory_order_acquire); 
        // Updating only in case there is something new... T may be a heavy struct
        if (m_readPos != writeIndex)
        {
            m_readPos = writeIndex;
            m_data = m_buffer[m_readPos];
        }
    }
    // NEED TO BE LIGHTNING FAST, CALLED MULTIPLE TIMES IN THE READ THREAD
    inline const T& get() const noexcept {return m_data;}

private:
    // Buffer
    static constexpr int bufferMaxSize = 4;
    std::array<T, bufferMaxSize> m_buffer;

    std::atomic<int> m_writePos {0};
    int m_readPos = 0;

    // Data
    T m_data;
};

标签: c++atomiclock-freememory-barriersstdatomic

解决方案


内存顺序与您何时看到对原子对象的某些特定更改无关,而是与此更改可以保证周围代码有关的内容有关。宽松的原子保证除了对原子对象本身的更改之外没有任何保证:更改将是原子的。但是您不能在任何同步上下文中使用宽松的原子。

你有一些需要同步的代码。您想要弹出已推送的内容,而不是尝试弹出尚未推送的内容。因此,如果您使用轻松的操作,则无法保证您的 pop 会看到此推送代码:

m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_relaxed);

正如它所写的那样。它也可以这样看:

m_position.store(nextPos, std::memory_relaxed);
m_buffer[nextPos] = element;

因此,您可能会尝试从缓冲区中获取一个尚不存在的元素。因此,您必须使用一些同步并至少使用获取/释放内存顺序。


和你的实际代码。我认为顺序可以如下:

const char wPos = m_position.load(std::memory_order_relaxed);
...
m_position.store(nextPos, std::memory_order_release);
...
const char wPos = m_position.exchange(-1, memory_order_acquire);

推荐阅读