c++ - C++ 无锁队列
问题描述
我设计了这个函数,用来实现Lock-Free队列,但是在实际执行过程中(dequeue)存在死锁问题。我检查了很多次,我认为这很好。我在 x86 平台上运行,有 12 个线程可以读写。
现在我想弄清楚是什么导致了这种情况,我想知道这是否是线程安全的设计,或者它需要在哪里继续优化以获得更高的性能。
12 个线程出队和 12 个线程入队。
开发工具:Visual Studio 2019
非常期待您的回复。感谢你。
#include <iostream>
#include <functional>
#include<atomic>
#include<cassert>
#include<thread>
#include<vector>
template<typename T>
class mpmc_queue_t
{
public:
mpmc_queue_t(size_t size) :
_size(size),
_mask(_size - 1),
_buffer((node_t*)(new aligned_node_t[_size]))
{
assert((_size != 0) && ((_size & (~_size + 1)) == _size));
_read.store(0, std::memory_order_relaxed);
_write.store(0, std::memory_order_relaxed);
for (size_t i = 0; i < _size; ++i)
{
_buffer[i].status.store(false, std::memory_order_relaxed);
}
}
~mpmc_queue_t()
{
delete[] _buffer;
}
bool enqueue(const T& data)
{
auto write = _write.fetch_add(1, std::memory_order_relaxed);
node_t* node = &_buffer[write & _mask];
while (true)
{
if (!node->status.load(std::memory_order_acquire))
{
node->data = data;
node->status.store(true, std::memory_order_release);
return true;
}
std::this_thread::yield();
}
}
bool dequeue(T& data)
{
auto read = _read.fetch_add(1, std::memory_order_relaxed);
node_t* node = &_buffer[read & _mask];
while (true)
{
if (node->status.load(std::memory_order_acquire))
{
data = node->data;
node->status.store(false, std::memory_order_release);
return true;
}
std::this_thread::yield();
}
}
private:
struct node_t
{
T data;
std::atomic_bool status;
};
typedef typename std::aligned_storage<sizeof(node_t), std::alignment_of<node_t>::value>::type aligned_node_t;
typedef char cache_line_pad_t[64];
cache_line_pad_t _pad0;
size_t _size;
size_t _mask;
node_t* const _buffer;
cache_line_pad_t _pad1;
std::atomic_size_t _read;
cache_line_pad_t _pad2;
std::atomic_size_t _write;
cache_line_pad_t _pad3;
};
#define COUNT 100000000
#define THREAD 12
typedef mpmc_queue_t<size_t> queue_t;
template<typename T>
void consumer_func(T* queue)
{
size_t count = COUNT;
size_t value = 0;
while (count > 0) {
if (queue->dequeue(value)) {
--count;
}
}
std::cout << "consumer_func ID: " << std::this_thread::get_id() << " ok" << std::endl;
}
template<typename T>
void producer_func(T* queue)
{
size_t count = COUNT;
while (count > 0) {
if (queue->enqueue(count)) {
--count;
}
}
std::cout << "producer_func ID: " << std::this_thread::get_id() << " ok" << std::endl;
}
template<typename T>
long double
run_test(
T producer_func,
T consumer_func)
{
typedef std::chrono::high_resolution_clock clock_t;
typedef std::chrono::time_point<clock_t> time_t;
time_t start;
time_t end;
start = clock_t::now();
std::thread producer0(producer_func);
std::thread producer1(producer_func);
std::thread producer2(producer_func);
std::thread producer3(producer_func);
std::thread producer4(producer_func);
std::thread producer5(producer_func);
std::thread producer6(producer_func);
std::thread producer7(producer_func);
std::thread producer8(producer_func);
std::thread producer9(producer_func);
std::thread producer10(producer_func);
std::thread producer11(producer_func);
std::thread consumer0(consumer_func);
std::thread consumer1(consumer_func);
std::thread consumer2(consumer_func);
std::thread consumer3(consumer_func);
std::thread consumer4(consumer_func);
std::thread consumer5(consumer_func);
std::thread consumer6(consumer_func);
std::thread consumer7(consumer_func);
std::thread consumer8(consumer_func);
std::thread consumer9(consumer_func);
std::thread consumer10(consumer_func);
std::thread consumer11(consumer_func);
producer0.join();
producer1.join();
producer2.join();
producer3.join();
producer4.join();
producer5.join();
producer6.join();
producer7.join();
producer8.join();
producer9.join();
producer10.join();
producer11.join();
consumer0.join();
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
consumer5.join();
consumer6.join();
consumer7.join();
consumer8.join();
consumer9.join();
consumer10.join();
consumer11.join();
end = clock_t::now();
return
(end - start).count()
* ((double)std::chrono::high_resolution_clock::period::num
/ std::chrono::high_resolution_clock::period::den);
}
int main()
{
{
queue_t queue(65536);
long double seconds = run_test(std::bind(&producer_func<queue_t>, &queue),
std::bind(&consumer_func<queue_t>, &queue));
std::cout << "The control group completed "
<< COUNT * THREAD
<< " iterations in "
<< seconds
<< " seconds. "
<< ((long double)COUNT * THREAD / seconds) / 1000000
<< " million enqueue/dequeue pairs per second."
<< std::endl;
}
return 0;
}
解决方案
这种设计不是无锁的,而是“无锁”的,因为出队中的线程可能必须等待对该项目的入队操作完成(通过 发出信号status
),即,它不提供锁所需的进度保证-自由。
正如 Matt Timmermans 已经指出的那样,索引环绕时会出现问题。不保证status
节点的 已经更新,或者,由于对上的操作status
不是顺序一致的,所以这个更新是否可见。当两个线程(在不同的轮次中)尝试推送到同一个节点时,这可能会导致数据竞争,因为两个线程都观察到node->status.load()
返回 false。
为了解决这个问题,您可以在节点中使用计数器而不是布尔值来跟踪节点所属的当前轮次(类似于 Dmitry Vukov 在此队列中的完成方式:http ://www.1024cores.net/home/无锁算法/队列/有界-mpmc-队列)
推荐阅读
- javascript - Angular SCSS 颜色变量在 IE11 中不起作用
- android - 如何修复发布 apk 断言在颤振中失败?
- java - Restfb - 为什么当我获取我的管理组时,应用程序会抛出 OAuthException:(#200)权限错误(代码 200,子代码为空)
- c++ - 我可以使用 try catch 语句来捕获任何错误而不是具体的吗?
- image - 无法在我的 minikube 中加载缓存的图像,我有 windows 10 企业版
- kubernetes - Kubernetes nginx ingress 访问集群外部而不使用服务
- javascript - 使用 for 循环和函数计算表上的行总数
- spring-boot - Springboot 通过过滤器获取动态 JSON 内容
- scala - 通过隐式类重载泛型方法
- acumatica - 为什么 Acumatica ERP PX.DataSource 在设计视图中出现渲染错误?