首页 > 解决方案 > 在 SPSC 框架中,condition_variable.notify_one() 信号不一致

问题描述

生产者在每次推入队列后,通过 conditionVar.notify_one() 向消费者发出信号。

然而,消费者在一些随机数的推送后醒来(因此随后的 notify_one()s)发生,有时是 21 有时是 2198,等等。在生产者中插入延迟(sleep_for() 或 yield())没有帮助。

我怎样才能使这个 SPSC 以锁步运行?

如何将此示例扩展到多个消费者(即 SPMC)?

void singleProducerSingleConsumer() {
    std::condition_variable conditionVar;
    std::mutex mtx;
    std::queue<int64_t> messageQueue;
    bool stopped = false;
    const std::size_t workSize = 4096;

    std::function<void(int64_t)> producerLambda = [&](int64_t id) {
        // Prepare a random number generator and push to the queue
        std::default_random_engine randomNumberGen{};
        std::uniform_int_distribution<int64_t> uniformDistribution{};

        for (auto count = 0; count < workSize; count++){
            //Always lock before changing state guarded by a mutex and condition_variable "conditionVar"
            std::lock_guard<std::mutex> lockGuard{ mtx };

            //Push a random number onto the queue
            messageQueue.push(uniformDistribution(randomNumberGen));

            //Notify the consumer
            conditionVar.notify_one();
            //std::this_thread::yield();
            /*std::this_thread::sleep_for(std::chrono::seconds(2));
            std::cout << "Producer woke " << std::endl;*/
        }
        //Production finished
        //Acquire the lock, set the stopped flag, inform the consumer
        std::lock_guard<std::mutex> lockGuard {mtx };

        std::cout << "Producer is done!" << std::endl;

        stopped = true;
        conditionVar.notify_one();
    };

    std::function<void(int64_t)> consumerLambda = [&](int64_t id) {

        do {
            std::unique_lock<std::mutex> uniqueLock{ mtx };
            //Acquire the lock only if stopped or the queue isn't empty
            conditionVar.wait(uniqueLock, [&]() {return stopped || !messageQueue.empty(); });

            //This thread owns the mutex here; pop the queue until it is empty
            std::cout << "Consumer received " << messageQueue.size() << " items" << std::endl;
            while (!messageQueue.empty()) {
                const auto val = messageQueue.front(); messageQueue.pop();
                std::cout << "Consumer obtained: " << val << std::endl;
            }
            uniqueLock.unlock();

            if (stopped) {
                //Producer has signaled a stop
                std::cout << "Consumer is done!" << std::endl;
                break;
            }

        } while (true);
    };

    std::thread consumer{ consumerLambda, 1 };
    std::thread producer{ producerLambda, 2 };
    consumer.join();
    producer.join();

    std::cout << "singleProducerSingleConsumer() finished" << std::endl;
}

标签: multithreadingmutexproducer-consumercondition-variablethread-synchronization

解决方案


如果我理解正确,您希望在生产者产生下一个数字之前消耗生产者产生的每个数字。那基本上是顺序执行,而不是并发执行。您可以通过简单地让生产者在普通函数调用中将值传递给消费者来最有效地完成顺序执行。结果不是生产者消费者模式的一个很好的例子。

线程由操作系统调度,与同时在您的计算机上执行的所有其他线程和进程竞争。您的生产者和消费者线程可能在同一个 cpu 内核上运行,这意味着它们必须按照操作系统的计划轮流执行。由于消费者在生产者写入数据之前无法消费数据,因此生产者将在其第一个执行窗口期间使用多个值填充消息队列才有意义,然后消费者将有时间消费消息队列中的值。因此,messageQueue 将分批填充和取消填充,直到程序完成。

您的解决方案应该扩展到处理多个消费者。


推荐阅读