c++ - 与条件变量相比,队列应用程序中的 C++20 信号量似乎很慢
问题描述
出于学习目的,我正在比较单个生产者单个消费者队列的实现。因此,我将条件变量实现与 C++20 计数信号量实现进行了比较。我会猜到信号量的实现会更快,但事实并非如此。在 Windows、MSVC 下,在我的电脑上,信号量的实现慢了大约 25%。我在下面包含了这两种实现。
条件变量实现有一个小的功能优势:可以使用 done() API 函数实现中止操作,而信号量实现需要一个特殊的“停止”值排队以解锁并退出拉线程。在我的想象中,单生产者单消费者队列是信号量的典型应用,但显然不是。
现在我想知道:
- 我是否做了一些不聪明的事情,以至于我的信号量实现变得不必要地慢了?
- 可能是微软的计数信号量实现太慢了吗?
- 还是 C++ 标准中的要求通常会使信号量变慢?
- 我只是误认为队列是信号量的正确应用程序吗?
- 当队列不是一个合适的应用程序时,信号量在哪些其他应用程序中优于条件变量?
条件变量实现:
#include <array>
#include <mutex>
#include <condition_variable>
/*
* locked_single_producer_single_consumer_queue_T is responsible for locked packet communication
* between 2 threads. One thread pushes, the other thread pulls.
*/
template<class T, int N = 16> // N must be a power 2
class locked_single_producer_single_consumer_queue_T
{
public:
/* When packet fits in the queue, then push shall return immediatelly. Otherwise it will block until it can push the packet. */
void push(T const& packet)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this] {return ((m_tail - m_head) & m_mask) != 1; });
m_data[m_head++] = packet;
m_head &= m_mask;
lock.unlock();
m_cv.notify_one();
}
/* When packet could be retreived from the queue, then pull shall return immediatelly. Otherwise it will block until it can pull the packet. */
bool pull(T& packet)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this] {return (((m_head - m_tail) & m_mask) != 0) || m_done; });
if(((m_head - m_tail) & m_mask) != 0) [[likely]]
{
packet = m_data[m_tail++];
m_tail &= m_mask;
lock.unlock();
m_cv.notify_one();
return true;
}
return false;
}
/* done() indicates that the pushing thread stopped. The pulling thread can continue reading
the remainder of the queue and should then return */
void done()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_done = true;
}
m_cv.notify_one();
}
private:
static_assert((N& (N - 1)) == 0, "N must be a power of 2");
static signed int const m_mask = N - 1;
using data_t = std::array<T, N>;
data_t m_data;
std::mutex m_mutex;
std::condition_variable m_cv;
int m_tail{ 0 };
int m_head{ 0 };
bool m_done{};
};
信号量实现:
#include <array>
#include <semaphore>
#include <atomic>
/*
* locked_single_producer_single_consumer_queue2_T is responsible for locking packet communication
* between 2 threads. One thread pushes, the other thread pulls.
*/
template<class T, int N = 16> // N must be a power 2
class locked_single_producer_single_consumer_queue2_T
{
public:
/* When packet fits in the queue, then push shall return immediatelly. Otherwise it will block until it can push the packet. */
void push(T const& packet)
{
m_available_space.acquire();
int head = m_head.load(std::memory_order_acquire);
m_data[head++ & m_mask] = packet;
m_head.store(head, std::memory_order_release);
m_available_packages.release();
}
/* When packet could be retreived from the queue, then pull shall return immediatelly. Otherwise it will block until it can pull the packet. */
T pull()
{
m_available_packages.acquire();
int tail = m_tail.load(std::memory_order_acquire);
T packet = m_data[tail++ & m_mask];
m_tail.store(tail, std::memory_order_release);
m_available_space.release();
return packet;
}
private:
static_assert((N& (N - 1)) == 0, "N must be a power of 2");
static signed int const m_mask = N - 1;
using data_t = std::array<T, N>;
data_t m_data;
std::atomic_int m_tail{ 0 };
std::atomic_int m_head{ 0 };
std::counting_semaphore<N> m_available_space{ N };
std::counting_semaphore<N> m_available_packages{ 0 };
};
*** 编辑 ***
根据要求,我还包括了一个完整的测试程序。它已经包含了这两种实现。(它需要带有信号量的 C++20)
#include <array>
#include <mutex>
#include <condition_variable>
#include <semaphore>
#include <atomic>
#include <iostream>
#include <vector>
#include <algorithm>
#include <future>
/*
* locked_single_producer_single_consumer_queue_T is responsible for locked packet communication
* between 2 threads. One thread pushes, the other thread pulls.
*/
template<class T, int N = 16> // N must be a power 2
class locked_single_producer_single_consumer_queue_T
{
public:
/* When packet fits in the queue, then push shall return immediatelly. Otherwise it will block until it can push the packet. */
void push(T const& packet)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this] {return ((m_tail - m_head) & m_mask) != 1; });
m_data[m_head++] = packet;
m_head &= m_mask;
lock.unlock();
m_cv.notify_one();
}
/* When packet could be retreived from the queue, then pull shall return immediatelly. Otherwise it will block until it can pull the packet. */
bool pull(T& packet)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this] {return (((m_head - m_tail) & m_mask) != 0) || m_done; });
if (((m_head - m_tail) & m_mask) != 0) [[likely]]
{
packet = m_data[m_tail++];
m_tail &= m_mask;
lock.unlock();
m_cv.notify_one();
return true;
}
return false;
}
/* done() indicates that the pushing thread stopped. The pulling thread can continue reading
the remainder of the queue and should then return */
void done()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
m_done = true;
}
m_cv.notify_one();
}
private:
static_assert((N& (N - 1)) == 0, "N must be a power of 2");
static signed int const m_mask = N - 1;
using data_t = std::array<T, N>;
data_t m_data;
std::mutex m_mutex;
std::condition_variable m_cv;
int m_tail{ 0 };
int m_head{ 0 };
bool m_done{};
};
/*
* locked_single_producer_single_consumer_queue2_T is responsible for locking packet communication
* between 2 threads. One thread pushes, the other thread pulls.
*/
template<class T, int N = 16> // N must be a power 2
class locked_single_producer_single_consumer_queue2_T
{
public:
/* When packet fits in the queue, then push shall return immediatelly. Otherwise it will block until it can push the packet. */
void push(T const& packet)
{
m_available_space.acquire();
int head = m_head.load(std::memory_order_acquire);
m_data[head++ & m_mask] = packet;
m_head.store(head, std::memory_order_release);
m_available_packages.release();
}
/* When packet could be retreived from the queue, then pull shall return immediatelly. Otherwise it will block until it can pull the packet. */
T pull()
{
m_available_packages.acquire();
int tail = m_tail.load(std::memory_order_acquire);
T packet = m_data[tail++ & m_mask];
m_tail.store(tail, std::memory_order_release);
m_available_space.release();
return packet;
}
private:
static_assert((N& (N - 1)) == 0, "N must be a power of 2");
static signed int const m_mask = N - 1;
using data_t = std::array<T, N>;
data_t m_data;
std::atomic_int m_tail{ 0 };
std::atomic_int m_head{ 0 };
std::counting_semaphore<N> m_available_space{ N };
std::counting_semaphore<N> m_available_packages{ 0 };
};
/******************************************************************************************************/
using implementation_t = bool;
implementation_t const condition_variable = false;
implementation_t const semaphore = true;
/*
* pusher() is a thread function that is responsible for pushing a defined
* sequence of integers in the lock_free queue
*/
std::atomic_int sum_ref{};
template<class queue_t>
void pusher(std::atomic_bool& do_continue_token, queue_t& queue)
{
int i = 0;
while (do_continue_token.load(std::memory_order_acquire))
{
queue.push(i);
sum_ref += i;
++i;
}
}
/*
* puller() is a thread function that is responsible for pulling
* integers from the lock_free queue, and compare it with the
* expected sequence
*/
std::atomic_int sum_check{};
template<implementation_t implementation, class queue_t>
int puller(queue_t& queue)
{
int i;
if constexpr (implementation == condition_variable)
{
while (queue.pull(i))
{
sum_check += i;
}
}
if constexpr (implementation == semaphore)
{
int j;
while ((j = queue.pull()) != -1)
{
sum_check += j;
i = j;
}
}
return i;
}
/*
* test() is responsible for kicking off two threads that push and pull from
* the queue for a duration of 10s. Test returns the last integer value that was
* pulled from the queue as an indication of speed.
*/
template<implementation_t implementation, class queue_t>
int test()
{
using namespace std::chrono_literals;
std::atomic_bool do_continue_token(true);
queue_t queue;
std::cout << '<' << std::flush;
std::future<void> fpusher = std::async(pusher<queue_t>, std::ref(do_continue_token), std::ref(queue));
std::future<int> fpuller = std::async(puller<implementation, queue_t>, std::ref(queue));
std::this_thread::sleep_for(10s);
do_continue_token.store(false, std::memory_order_release);
fpusher.wait();
if constexpr (implementation == condition_variable)
{
queue.done(); // to stop the waiting thread
}
if constexpr (implementation == semaphore)
{
queue.push(-1); // to stop the waiting thread
}
int i = fpuller.get();
if (sum_check != sum_ref)
{
throw;
}
std::cout << '>' << std::endl;
return i;
}
/*
* main() is responsible for performing multiple tests of different implementations.
* Results are collected, ordered and printed.
*/
int main()
{
struct result_t
{
std::string m_name;
int m_count;
};
using condition_variable_queue_t = locked_single_producer_single_consumer_queue_T<int, 1024>;
using semaphore_queue_t = locked_single_producer_single_consumer_queue2_T<int, 1024>;
std::vector<result_t> results // 6 runs
{
{ "condition_variable", test<condition_variable, condition_variable_queue_t>() },
{ "semaphore", test<semaphore, semaphore_queue_t>() },
{ "condition_variable", test<condition_variable, condition_variable_queue_t>() },
{ "semaphore", test<semaphore, semaphore_queue_t>() },
{ "condition_variable", test<condition_variable, condition_variable_queue_t>() },
{ "semaphore", test<semaphore, semaphore_queue_t>() },
};
std::sort(results.begin(), results.end(), [](result_t const& lhs, result_t const& rhs) { return lhs.m_count < rhs.m_count; });
std::cout << "The higher the count, the faster the solution" << std::endl;
for (result_t const& result : results)
{
std::cout << result.m_name << ": " << result.m_count << std::endl;
}
}
运行的输出:
<>
<>
<>
<>
<>
<>
The higher the count, the faster the solution
semaphore: 58304215
semaphore: 59302013
semaphore: 61896024
condition_variable: 84140445
condition_variable: 87045903
condition_variable: 90893057
解决方案
我的问题一直困扰着我,所以我调查了微软当前的信号量实现。计数信号量有两个原子,并通过等待其中一个原子来实现阻塞等待。请注意,当信号量计数未达到零时,也不会调用等待原子。该实现也仅在确定至少有一个线程正在等待它时才通知(原子)。但是信号量的实现仍然依赖于新的 C++20 等待/通知函数。
新的 C++20 等待/通知函数通过条件变量池实现。我想这是最佳的,至少我不知道另一种更快的方法。
归根结底,信号量的这种实现是基于条件变量的,然后我可以想象上面提到的“条件变量实现”更快。假设互斥锁大部分时间没有锁定,那么获得互斥锁很便宜。假设(由于 1024 的大队列大小)我们几乎不必等待条件变量谓词,那么 m_cv.wait() 也很便宜。
“信号量实现”实际上几乎相同,只是现在需要读取和写入两个原子(m_head 和 m_tail)。在“条件变量实现”中,互斥锁隐含地保护了这些变量。然后我的结论是,“信号量实现”中的这两个原子有所不同。而且,不幸的是,你不能没有它们(在“信号量实现”中),所以“条件变量实现”更快。
要回答这个问题:
问:我是否做了一些不聪明的事情,以至于我的信号量实现变得不必要地慢了?
A:我不知道(还)
Q:可能是微软的计数信号量实现太慢了吗?
答:看起来不像
问:还是 C++ 标准中的要求通常会使信号量变慢?
- 答:再次,看起来不像。
问:我是否误认为队列是信号量的正确应用?
A:是的,那可能是在早期
问:当队列不是适当的应用程序时,信号量在其他哪些应用程序中优于条件变量?
答:还不知道。可能是一个简单等待有限资源的应用程序。
推荐阅读
- spring-boot - javax.el.PropertyNotFoundException:目标无法访问,标识符 [MyController] 使用 Spring Boot 和 JSF 解析为 null
- java - 如何使用服务帐户凭据将文件上传到 Google Team Drive 中的文件夹?
- java - Java - 调用方法
- java - 如何从 Windows 连接远程 Q 管理器(在 docker 上)?
- java - 无法在 log4j2 中使用 RollingFileAppender 写入日志
- scala - 函数定义中的“简单表达式的非法开始”
- uwp - 如何在 UWP 中导入会话密钥(RSA 公钥交换)
- python-3.x - 关于回文序列的 Python 代码
- javascript - 如何在完整日历中的所有单元格日显示文本
- scala - 如何过滤类型为(String,Seq(String,String,...))的元组中的'@'字符?