c++ - 多个生产者 - 消费者问题坚持最后一次消费
问题描述
我正在尝试使用 pthreads 和信号量解决多生产者 - 消费者问题,但它总是停留在最后一次消费和停止。它将有 NO_ITEMS 的项目并假设缓冲区的大小为 BUFFER_SIZE
这是我下面的当前代码。
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <stack>
#define BUFFER_SIZE 50
#define NO_ITEMS 100
using namespace std;
void* thread_producer(void* args);
void* thread_consumer(void* args);
void addItem(int i);
void removeItem();
sem_t fillCount;
sem_t emptyCount;
pthread_mutex_t mutex;
stack<int> items;
static int count = 0;
int main()
{
sem_init(&fillCount, 0, 0);
sem_init(&emptyCount, 0, BUFFER_SIZE);
pthread_mutex_init(&mutex, nullptr);
pthread_t p1, c1, c2, c3;
pthread_create(&p1, nullptr, thread_producer, nullptr);
pthread_create(&c1, nullptr, thread_consumer, nullptr);
pthread_create(&c2, nullptr, thread_consumer, nullptr);
pthread_create(&c3, nullptr, thread_consumer, nullptr);
pthread_join(p1, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
sem_destroy(&fillCount);
sem_destroy(&emptyCount);
pthread_mutex_destroy(&mutex);
return 0;
}
void* thread_consumer(void* args) {
while (count < NO_ITEMS) {
sem_wait(&fillCount);
pthread_mutex_lock(&mutex);
if (!items.empty() && count < NO_ITEMS - 1) {
removeItem();
}
count++;
pthread_mutex_unlock(&mutex);
sem_post(&emptyCount);
}
return nullptr;
}
void* thread_producer(void* args) {
for (int i = 0; i < NO_ITEMS; i++) {
sem_wait(&emptyCount);
pthread_mutex_lock(&mutex);
addItem(i);
// sleep(1);
pthread_mutex_unlock(&mutex);
sem_post(&fillCount);
}
return nullptr;
}
void addItem(int i) {
cout << "Produced: " << i << endl;
items.push(i);
}
void removeItem() {
cout << "Consumed: " << items.top() << endl;
items.pop();
}
这是输出的一部分:
Consumed: 0
Produced: 96
Consumed: 96
Produced: 97
Produced: 98
Consumed: 98
Consumed: 97
Produced: 99 // halt
解决方案
有缺陷的逻辑
您的代码有逻辑问题。假设NO_ITEMS
是 100,到目前为止已经消耗了 99。让两个消费者线程在该点到达while
循环的顶部,并假设两者都读count
为 99(但见下文),因此进入循环体。两个消费者都会阻塞sem_wait()
,但最多还有一个项目要生产,所以生产者最多会再增加一次信号量,让至少一个消费者无限期地阻塞。
未定义的行为
此外,您的thread_consumer()
函数包含数据竞争,使您的程序的行为未定义。count
具体来说,条件中共享变量的读取while
没有正确同步。尽管人们无法可靠地预测 UB 将如何表现(否则它不会是“未定义的”),但非同步访问显示一个线程的明显失败以查看其他线程的共享变量更新是相当普遍的。这样的故障模式将自行解释您观察到的特定行为。
很可能,对这个同步问题的正确修复也可以解决逻辑问题。
解决方案
有多种可能的解决方案。以下是一些有希望的:
信号量不是特别适合这个问题。无论如何,您都需要一个互斥体,而它通常的信号对应物是条件变量。我会将两个信号量转换为两个(或者可能只是一个)普通整数变量,并在生产者和消费者中使用标准互斥锁 + CV 模式。这将包括为
count
消费者的读取添加互斥保护。另一方面,如果你有义务使用信号量,那么你可以
- 为消费者的阅读添加适当的互斥保护
count
- 一定要保留消费者在成功递减信号量后是否可以实际消费某项的测试
fillCount
在加入生产者线程之后但在尝试加入消费者之前让主程序发布两次(消费者数量 - 1 次) 。这将解除阻止任何认为他们能够消费一个项目但在最后一个项目被另一个消费者消费后仍然等待的消费者。
- 为消费者的阅读添加适当的互斥保护
或者您可以使用混合:保留
emptyCount
信号量以限制在任何给定时间等待的项目数量(而不是为此目的切换到 CV),但切换到互斥锁 + CV 模式来管理消费者。
推荐阅读
- database - 如何有效地从 DynamoDB 中检索海量数据?
- ios - 当应用程序通过推送通知从终止状态打开时,iOS Web 视图应用程序崩溃
- angular - 带有 aria-live 的可访问性 Angular5
- r - 2d 绘图为空(第三个变量中的 2 行和线条颜色)
- php - Behat 脚本无法检测到 CKEditor 工具栏元素
- google-cloud-platform - GCP 虚拟机实例 - 操作系统以其他用户身份登录
- html - 如何使这个左侧边栏在移动设备上可见?
- firebase - 如何获取云函数onCreate触发器文档ID
- android - Flutter 应用程序在启动时崩溃,log-cat 中没有错误接受一些警告
- antd - 关于“antd 在移动端无法正常运行”的澄清