multithreading - 在 SPSC 框架中,condition_variable.notify_one() 信号不一致
问题描述
生产者在每次推入队列后,通过 conditionVar.notify_one() 向消费者发出信号。
然而,消费者在一些随机数的推送后醒来(因此随后的 notify_one()s)发生,有时是 21 有时是 2198,等等。在生产者中插入延迟(sleep_for() 或 yield())没有帮助。
我怎样才能使这个 SPSC 以锁步运行?
如何将此示例扩展到多个消费者(即 SPMC)?
void singleProducerSingleConsumer() {
std::condition_variable conditionVar;
std::mutex mtx;
std::queue<int64_t> messageQueue;
bool stopped = false;
const std::size_t workSize = 4096;
std::function<void(int64_t)> producerLambda = [&](int64_t id) {
// Prepare a random number generator and push to the queue
std::default_random_engine randomNumberGen{};
std::uniform_int_distribution<int64_t> uniformDistribution{};
for (auto count = 0; count < workSize; count++){
//Always lock before changing state guarded by a mutex and condition_variable "conditionVar"
std::lock_guard<std::mutex> lockGuard{ mtx };
//Push a random number onto the queue
messageQueue.push(uniformDistribution(randomNumberGen));
//Notify the consumer
conditionVar.notify_one();
//std::this_thread::yield();
/*std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Producer woke " << std::endl;*/
}
//Production finished
//Acquire the lock, set the stopped flag, inform the consumer
std::lock_guard<std::mutex> lockGuard {mtx };
std::cout << "Producer is done!" << std::endl;
stopped = true;
conditionVar.notify_one();
};
std::function<void(int64_t)> consumerLambda = [&](int64_t id) {
do {
std::unique_lock<std::mutex> uniqueLock{ mtx };
//Acquire the lock only if stopped or the queue isn't empty
conditionVar.wait(uniqueLock, [&]() {return stopped || !messageQueue.empty(); });
//This thread owns the mutex here; pop the queue until it is empty
std::cout << "Consumer received " << messageQueue.size() << " items" << std::endl;
while (!messageQueue.empty()) {
const auto val = messageQueue.front(); messageQueue.pop();
std::cout << "Consumer obtained: " << val << std::endl;
}
uniqueLock.unlock();
if (stopped) {
//Producer has signaled a stop
std::cout << "Consumer is done!" << std::endl;
break;
}
} while (true);
};
std::thread consumer{ consumerLambda, 1 };
std::thread producer{ producerLambda, 2 };
consumer.join();
producer.join();
std::cout << "singleProducerSingleConsumer() finished" << std::endl;
}
解决方案
如果我理解正确,您希望在生产者产生下一个数字之前消耗生产者产生的每个数字。那基本上是顺序执行,而不是并发执行。您可以通过简单地让生产者在普通函数调用中将值传递给消费者来最有效地完成顺序执行。结果不是生产者消费者模式的一个很好的例子。
线程由操作系统调度,与同时在您的计算机上执行的所有其他线程和进程竞争。您的生产者和消费者线程可能在同一个 cpu 内核上运行,这意味着它们必须按照操作系统的计划轮流执行。由于消费者在生产者写入数据之前无法消费数据,因此生产者将在其第一个执行窗口期间使用多个值填充消息队列才有意义,然后消费者将有时间消费消息队列中的值。因此,messageQueue 将分批填充和取消填充,直到程序完成。
您的解决方案应该扩展到处理多个消费者。
推荐阅读
- node.js - 显示无法找到 package.json。Node.js 启动时可能会出现问题。验证 package.json 是否有效或将代码放在名为 server.js 或 app.js 的文件中
- node.js - npm install 时不同的节点版本冲突
- java - 如何为使用apache-poi java将数据写入word文档的类编写单元测试?
- c# - DoubleClick 在 DataGrid 和 DataTemplate 中触发
- azure-active-directory - MS Graph:获取用户角色和手机
- java - 启动 JavaFX 应用程序时的内部 NPE
- amazon-redshift - 如何在 Redshift 中查询架构信息?
- java - 从 WildFly8 / jdk8 迁移到 WildFly14 / jdk11 后 WebService 客户端中的 java.net.SocketTimeoutException
- flutter - 由于我的代码中的 setState(),我的动画滞后
- orbeon - 使用请求参数更改验证警报