首页 > 解决方案 > C++11无锁序列号生成器安全吗?

问题描述

目标是在现代 C++ 中实现一个序列号生成器。上下文处于并发环境中。

要求 #1类必须是单例的(所有线程通用)

要求 #2用于数字的类型是 64 位整数。

要求#3来电者可以请求多个号码

要求#4此类将在能够为调用提供服务之前缓存一系列数字。因为它缓存了一个序列,所以它还必须存储上限 -> 能够返回的最大数量。

要求 #5最后但并非最不重要的一点是,在启动时(构造函数)并且当没有可用的数字可提供( n_requested > n_avalaible )时,单例类必须查询数据库以获取新序列。从 DB 加载,更新 seq_n_ 和 max_seq_n_。

其界面的简要草案如下:

class singleton_sequence_manager {

public:

    static singleton_sequence_manager& instance() {
        static singleton_sequence_manager s;
        return s;
    }

    std::vector<int64_t> get_sequence(int64_t n_requested);

private:
    singleton_sequence_manager(); //Constructor
    void get_new_db_sequence(); //Gets a new sequence from DB

    int64_t seq_n_;
    int64_t max_seq_n_;
}

示例只是为了阐明用例。假设在启动时,DB 将 seq_n_ 设置为 1000,将 max_seq_n_ 设置为 1050:

get_sequence.(20); //Gets [1000, 1019]
get_sequence.(20); //Gets [1020, 1039]
get_sequence.(5); //Gets [1040, 1044]
get_sequence.(10); //In order to serve this call, a new sequence must be load from DB

显然,使用锁和 std::mutex 的实现非常简单。

我感兴趣的是使用 std::atomic 和原子操作实现无锁版本。

我的第一次尝试如下:

int64_t seq_n_;
int64_t max_seq_n_;

改为:

std::atomic<int64_t> seq_n_;
std::atomic<int64_t> max_seq_n_;

从 DB 获取新序列只需在原子变量中设置新值:

void singleton_sequence_manager::get_new_db_sequence() {
    //Sync call is made to DB
    //Let's just ignore unhappy paths for simplicity
    seq_n_.store( start_of_seq_got_from_db );
    max_seq_n_.store( end_of_seq_got_from_db );
    //At this point, the class can start returning numbers in [seq_n_ : max_seq_n_]
}

现在使用原子比较和交换技术的 get_sequence 函数:

std::vector<int64_t> singleton_sequence_manager::get_sequence(int64_t n_requested) {

    bool succeeded{false};
    int64_t current_seq{};
    int64_t next_seq{};

    do {

        current_seq = seq_n_.load();
        do {
            next_seq = current_seq + n_requested + 1;
        }
        while( !seq_n_.compare_exchange_weak( current_seq, next_seq ) );
        //After the CAS, the caller gets the sequence [current_seq:next_seq-1]

        //Check if sequence is in the cached bound.
        if( max_seq_n_.load() > next_seq - 1 )
            succeeded = true;
        else //Needs to load new sequence from DB, and re-calculate again
            get_new_db_sequence();

    }        
    while( !succeeded );

    //Building the response        
    std::vector<int64_t> res{};
    res.resize(n_requested);
    for(int64_t n = current_seq ; n < next_seq ; n++)
        res.push_back(n);

    return res;
}

想法:

标签: c++c++11thread-safetyatomiccompare-and-swap

解决方案


您的无锁版本有一个根本缺陷,因为它将两个独立的原子变量视为一个实体。由于写入seq_n_max_seq_n_是单独的语句,因此它们可以在执行期间分开,从而导致其中一个与另一个配对时的值不正确。

例如,一个线程可以通过 CAS 内部 while 循环(n_requested对于当前缓存的序列来说太大了),然后在检查它是否被缓存之前被挂起。第二个线程可以通过并将max_seq_n值更新为更大的值。然后第一个线程恢复并通过max_seq_n检查,因为该值已由第二个线程更新。它现在使用无效序列。

get_new_db_sequence在两次store通话之间可能会发生类似的事情。

由于您正在写入两个不同的位置(即使在内存中相邻),并且它们无法自动更新(由于 128 位的组合大小不是您的编译器支持的原子大小),因此写入必须受到保护互斥体。

自旋锁应该只用于非常短的等待,因为它确实会消耗 CPU 周期。典型的用法是使用短自旋锁,如果资源仍然不可用,则使用更昂贵的东西(如互斥锁)来等待使用 CPU 时间。


推荐阅读