首页 > 解决方案 > 多个生产者 - 消费者问题坚持最后一次消费

问题描述

我正在尝试使用 pthreads 和信号量解决多生产者 - 消费者问题,但它总是停留在最后一次消费和停止。它将有 NO_ITEMS 的项目并假设缓冲区的大小为 BUFFER_SIZE

这是我下面的当前代码。

#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100

using namespace std;

void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();

sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;

stack<int> items;
static int count = 0;



int main()
{
    sem_init(&fillCount, 0, 0);
    sem_init(&emptyCount, 0, BUFFER_SIZE);
    pthread_mutex_init(&mutex, nullptr);
    pthread_t p1, c1, c2, c3;

    pthread_create(&p1, nullptr, thread_producer, nullptr);
    pthread_create(&c1, nullptr, thread_consumer, nullptr);
    pthread_create(&c2, nullptr, thread_consumer, nullptr);
    pthread_create(&c3, nullptr, thread_consumer, nullptr);

    pthread_join(p1, nullptr);
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(c3, nullptr);

    sem_destroy(&fillCount);
    sem_destroy(&emptyCount);
    pthread_mutex_destroy(&mutex);

    return 0;
}

void* thread_consumer(void* args) {

    while (count < NO_ITEMS) {
        sem_wait(&fillCount);
        pthread_mutex_lock(&mutex);

        if (!items.empty() && count < NO_ITEMS - 1) {
            removeItem();
        }

        count++;
        pthread_mutex_unlock(&mutex);
        sem_post(&emptyCount);
    }

    return nullptr;
}

void* thread_producer(void* args) {
    for (int i = 0; i < NO_ITEMS; i++) {
        sem_wait(&emptyCount);
        pthread_mutex_lock(&mutex);

        addItem(i);
        // sleep(1);

        pthread_mutex_unlock(&mutex);
        sem_post(&fillCount);
    }

    return nullptr;

}

void addItem(int i) {
    cout << "Produced: " << i << endl;
    items.push(i);
}

void removeItem() {
    cout << "Consumed: " << items.top() << endl;
    items.pop();

}

这是输出的一部分:

Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt

标签: c++pthreadssemaphoreproducer-consumer

解决方案


有缺陷的逻辑

您的代码有逻辑问题。假设NO_ITEMS是 100,到目前为止已经消耗了 99。让两个消费者线程在该点到达while循环的顶部,并假设两者都读count为 99(但见下文),因此进入循环体。两个消费者都会阻塞sem_wait(),但最多还有一个项目要生产,所以生产者最多会再增加一次信号量,让至少一个消费者无限期地阻塞。

未定义的行为

此外,您的thread_consumer()函数包含数据竞争,使您的程序的行为未定义。count具体来说,条件中共享变量的读取while没有正确同步。尽管人们无法可靠地预测 UB 将如何表现(否则它不会是“未定义的”),但非同步访问显示一个线程的明显失败以查看其他线程的共享变量更新是相当普遍的。这样的故障模式将自行解释您观察到的特定行为。

很可能,对这个同步问题的正确修复也可以解决逻辑问题。

解决方案

有多种可能的解决方案。以下是一些有希望的:

  1. 信号量不是特别适合这个问题。无论如何,您都需要一个互斥体,而它通常的信号对应物是条件变量。我会将两个信号量转换为两个(或者可能只是一个)普通整数变量,并在生产者和消费者中使用标准互斥锁 + CV 模式。这将包括为count消费者的读取添加互斥保护。

  2. 另一方面,如果你有义务使用信号量,那么你可以

    • 为消费者的阅读添加适当的互斥保护count
    • 一定要保留消费者在成功递减信号量后是否可以实际消费某项的测试
    • fillCount在加入生产者线程之后但在尝试加入消费者之前让主程序发布两次(消费者数量 - 1 次) 。这将解除阻止任何认为他们能够消费一个项目但在最后一个项目被另一个消费者消费后仍然等待的消费者。
  3. 或者您可以使用混合:保留emptyCount信号量以限制在任何给定时间等待的项目数量(而不是为此目的切换到 CV),但切换到互斥锁 + CV 模式来管理消费者。


推荐阅读