c - C 中带 pthreads + 信号量的生产者-消费者(同时访问的缓冲区槽)
问题描述
我用 C 编写了一个多线程的生产者-消费者程序。我的初始程序不是最小/完整/可验证的,所以我从头开始重写它,错误仍然存在。我发现多个生产者线程正在尝试生成相同的数据(从数据序列中),我不知道如何阻止它。
我尝试过的一些事情: - 我创建了一个与缓冲区大小相同的 bool 数组,以确保线程只将数据放入一个空插槽中,并从其中获取数据,这解决了一个问题 - 我尝试移动输入/输出槽递增到生产者/消费者的开头 while 循环(分别),但它会导致死锁
另一个不相关的问题(我明天可能会自己解决,但现在是这样)是序列中的许多值都被跳过了,尽管我希望它们都被生成。
请注意:我已经学习 C 一个月了,这也是我在这里的第一篇文章。在过去的几年里,我从这个网站学到了很多东西,而且我知道有时这里的反应可能会很苛刻,所以请善待那些真心想学习和提高的人。谢谢!:)
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
char * Buffer;
int nBufferSlots = 10;
int nThreads = 2;
pthread_t Producers, Consumers;
sem_t Empty, Full, Mutex;
int inputSlot;
int outputSlot;
char value = 0;
char SlotOccupationBoolArray[10] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
void * producer(void *args)
{
while(1)
{
sleep( rand() % 3);
value++;
sem_wait(&Empty);
sem_wait(&Mutex);
if (SlotOccupationBoolArray[inputSlot] == 0)
{
Buffer[inputSlot] = value;
SlotOccupationBoolArray[inputSlot] = 1;
}
else
{
value--;
}
inputSlot = (inputSlot + 1) % nBufferSlots;
sem_post(&Mutex);
sem_post(&Full);
printf("\n[ ");
for (int i=0;i<10;i++)
{
printf("%d ", Buffer[i]);
}
printf("]\n");
}
}
void * consumer(void *args)
{
while(1)
{
sleep( rand() % 5);
sem_wait(&Full);
sem_wait(&Mutex);
if (SlotOccupationBoolArray[outputSlot] == 1)
{
char ValueConsumed = Buffer[outputSlot];
printf("\nConsumed value: %d\n", ValueConsumed);
Buffer[outputSlot] = 0;
SlotOccupationBoolArray[outputSlot] = 0;
}
outputSlot = (outputSlot + 1) % nBufferSlots;
sem_post(&Mutex);
sem_post(&Empty);
}
}
int main(void)
{
sem_open("/Mutex", O_CREAT, S_IRUSR | S_IWUSR, 1);
sem_open("/Full", O_CREAT, S_IRUSR | S_IWUSR, 0);
Buffer = (char*) malloc(nBufferSlots*sizeof(char));
sem_open("/Empty", O_CREAT, S_IRUSR | S_IWUSR, nBufferSlots);
Producers = (pthread_t) malloc(nThreads*sizeof(pthread_t));
Consumers = (pthread_t) malloc(nThreads*sizeof(pthread_t));
// set charBuffer elements to 0
for (char i=0;i<10;i++)
{
Buffer[i] = 0;
}
for (int i = 0; i < nThreads; i++)
{
pthread_create(&Producers, NULL, producer, NULL);
pthread_create(&Consumers, NULL, consumer, NULL);
}
for(int i=0;i<nThreads;i++){
int err = pthread_create(&Producers,NULL,producer,NULL);
if(err != 0){
printf("Error creating producer %d\n",i+1);
}else{
printf("Successfully created producer %d\n",i+1);
}
}
for(int i=0;i<nThreads;i++){
int err = pthread_create(&Consumers,NULL,consumer,NULL);
if(err != 0){
printf("Error creating consumer %d\n",i+1);
}else{
printf("Successfully created consumer %d\n",i+1);
}
}
pthread_exit(NULL);
}
我要输出的所有内容是具有当前值的 Buffer 数组(每次运行生产者线程时)和已消耗的每个值。
我希望在输出中看到的是这样的:
- [ 1 0 0 0 0 0 0 0 0 0 ]
- [ 1 2 0 0 0 0 0 0 0 0 ]
- [ 1 2 3 0 0 0 0 0 0 0 ]
- 消耗值:1
- [ 0 2 3 4 0 0 0 0 0 0 ]
- [ 0 2 3 4 5 0 0 0 0 0 ]
- 消耗值:3
- [ 0 2 0 4 5 0 0 0 0 0 ]
- 消耗值:4
- [ 0 2 0 0 5 6 0 0 0 0 ]
- [ 0 2 0 0 5 6 7 0 0 0 ]
但是,相反,我看到的是这样的(重复的 2/4 是问题):
- [ 2 2 4 4 5 6 0 0 0 0 ]
- [ 2 2 4 4 5 6 7 0 0 0 ]
- [ 2 2 4 4 5 6 7 8 0 0 ]
- 消耗值:2
- 消耗值:2
- [ 0 0 4 4 5 6 7 8 9 0 ]
- 消耗值:4
(为格式化道歉)
解决方案
值被跳过和/或重复,因为在producer()
函数中,value++
是在互斥体/信号量之外完成的,并且值的读取以及在value--
内部完成。
您应该记住的一件事是线程可以由操作系统以任何随机方式调度。因此,您应该使用互斥锁/信号量或任何其他机制来保护在多个线程之间共享的资源。
在您的程序中,可能会发生以下序列。
- 初始值=0。Producer-1 线程来做
value++
。现在值 = 1。它同时得到Empty
和Mutex
。 - 在 Producer-1 将值写入缓冲槽之前,Producer-2 会被调度并且它会执行
value++
。现在值 = 2。Producer-2 会得到Empty
,但它必须等待,Mutex
因为它被 Producer-1 锁定。 - 现在 Producer-1 再次被安排。因为
value=2
,此时,它会将相同的内容写入缓冲区。写入后,它同时解锁Empty
和Mutex
。 - 在此之后,Producer-2 被安排。它在
Mutex
Producer-1 解锁时得到。仍然value=2
如此,它会将相同的内容写入缓冲区。 - 请注意,如果在 Producer-2 写入之前安排了另一个 Producer 线程,则
value
可能仍会被更改。由于值是在外部更新的Mutex
,因此它可以被更改的次数没有限制/数字被跳过。
那么,这个问题的解决方案是什么?
将代码更新值(或任何通用的共享资源)移到producer()
. 在您的代码中,您不必先做value++
,如果插槽不可用,请做value--
。最好的解决方案是在写入缓冲区时更新值,如下所示。
//value++; //Don't update it here. Move it to critical section of the code, inside mutex.
sem_wait(&Empty);
sem_wait(&Mutex);
if (SlotOccupationBoolArray[inputSlot] == 0)
{
Buffer[inputSlot] = ++value; //Update and assign value, Read about pre-increment
SlotOccupationBoolArray[inputSlot] = 1;
}
//else //This section is not required.
//{
// value--;
//}
除此之外,还有一个问题。sem_post
forFull
应该在 Producer 写入缓冲区中的插槽时发生。同样,当消费者从缓冲区中的插槽读取时,应该会发生sem_post
for 。Empty
但是你sem_post
在每个循环中都在做。会引起一些问题。所以也要注意这个。仅当读/写发生时才为sem_post
/Full
做。Empty
PS:互斥量和信号量是不同的概念。根据用例适当地使用它们对于良好、有效和更具可读性的编程至关重要。
推荐阅读
- python - VSCode 在 linter 中无法解析导入“sys”
- bash - Sed-替换紧接在特定模式之后的下一个字符串单词,但仅在文件中第一次出现
- c# - System.Random 的多个实例是否仍然在 .Net Core 中使用相同的种子?
- git - 重命名master但保留分支历史
- histogram - influxdb 通量中的非累积直方图函数
- pyspark - PySpark 添加 ID 列和过滤器损坏
- carousel - 如何停止 ng-bootstrap 轮播的幻灯片随机排序
- swift - 使用带绑定的 ForEach 循环会导致数组缩小时索引超出范围(SwiftUI)
- python-3.x - 如何将名称拆分为名字、中间名和姓氏,并在函数中一次全部分配到 Pandas 列中?
- bigtable - 将返回所有单元格的 Python BigTable 客户端“和”过滤器