首页 > 解决方案 > C 中带 pthreads + 信号量的生产者-消费者(同时访问的缓冲区槽)

问题描述

我用 C 编写了一个多线程的生产者-消费者程序。我的初始程序不是最小/完整/可验证的,所以我从头开始重写它,错误仍然存​​在。我发现多个生产者线程正在尝试生成相同的数据(从数据序列中),我不知道如何阻止它。

我尝试过的一些事情: - 我创建了一个与缓冲区大小相同的 bool 数组,以确保线程只将数据放入一个空插槽中,并从其中获取数据,这解决了一个问题 - 我尝试移动输入/输出槽递增到生产者/消费者的开头 while 循环(分别),但它会导致死锁

另一个不相关的问题(我明天可能会自己解决,但现在是这样)是序列中的许多值都被跳过了,尽管我希望它们都被生成。

请注意:我已经学习 C 一个月了,这也是我在这里的第一篇文章。在过去的几年里,我从这个网站学到了很多东西,而且我知道有时这里的反应可能会很苛刻,所以请善待那些真心想学习和提高的人。谢谢!:)

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

char * Buffer;
int nBufferSlots = 10;
int nThreads = 2;
pthread_t Producers, Consumers;
sem_t Empty, Full, Mutex;
int inputSlot;
int outputSlot;
char value = 0;
char SlotOccupationBoolArray[10] = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};

void * producer(void *args)
{
    while(1)
    {
        sleep( rand() % 3);

        value++;

        sem_wait(&Empty);
        sem_wait(&Mutex);

        if (SlotOccupationBoolArray[inputSlot] == 0)
        {
            Buffer[inputSlot] = value;
            SlotOccupationBoolArray[inputSlot] = 1;
        }
        else
        {
            value--;
        }

        inputSlot = (inputSlot + 1) % nBufferSlots;

        sem_post(&Mutex);
        sem_post(&Full);

        printf("\n[ ");
        for (int i=0;i<10;i++)
        {
            printf("%d ", Buffer[i]);
        }
        printf("]\n");
    }
}
void * consumer(void *args)
{
    while(1)
    {
        sleep( rand() % 5);

        sem_wait(&Full);
        sem_wait(&Mutex);

        if (SlotOccupationBoolArray[outputSlot] == 1)
        {
            char ValueConsumed = Buffer[outputSlot];
            printf("\nConsumed value: %d\n", ValueConsumed);
            Buffer[outputSlot] = 0;
            SlotOccupationBoolArray[outputSlot] = 0;
        }

        outputSlot = (outputSlot + 1) % nBufferSlots;

        sem_post(&Mutex);
        sem_post(&Empty);
    }
}

int main(void)
{
    sem_open("/Mutex", O_CREAT, S_IRUSR | S_IWUSR, 1);
    sem_open("/Full", O_CREAT, S_IRUSR | S_IWUSR, 0);

    Buffer = (char*) malloc(nBufferSlots*sizeof(char));

    sem_open("/Empty", O_CREAT, S_IRUSR | S_IWUSR, nBufferSlots);

    Producers = (pthread_t) malloc(nThreads*sizeof(pthread_t));
    Consumers = (pthread_t) malloc(nThreads*sizeof(pthread_t));

    // set charBuffer elements to 0
    for (char i=0;i<10;i++)
    {
        Buffer[i] = 0;
    }

    for (int i = 0; i < nThreads; i++)
    {
        pthread_create(&Producers, NULL, producer, NULL);
        pthread_create(&Consumers, NULL, consumer, NULL);
    }

    for(int i=0;i<nThreads;i++){
        int err = pthread_create(&Producers,NULL,producer,NULL);
        if(err != 0){
            printf("Error creating producer %d\n",i+1);
        }else{
            printf("Successfully created producer %d\n",i+1);
        }
    }

    for(int i=0;i<nThreads;i++){
        int err = pthread_create(&Consumers,NULL,consumer,NULL);
        if(err != 0){
            printf("Error creating consumer %d\n",i+1);
        }else{
            printf("Successfully created consumer %d\n",i+1);
        }
    }

    pthread_exit(NULL);
}

我要输出的所有内容是具有当前值的 Buffer 数组(每次运行生产者线程时)和已消耗的每个值。

我希望在输出中看到的是这样的:

但是,相反,我看到的是这样的(重复的 2/4 是问题):

(为格式化道歉)

标签: cpthreadsbuffersemaphoreproducer-consumer

解决方案


值被跳过和/或重复,因为在producer()函数中,value++是在互斥体/信号量之外完成的,并且值的读取以及在value--内部完成。

您应该记住的一件事是线程可以由操作系统以任何随机方式调度。因此,您应该使用互斥锁/信号量或任何其他机制来保护在多个线程之间共享的资源。

在您的程序中,可能会发生以下序列。

  1. 初始值=0。Producer-1 线程来做value++。现在值 = 1。它同时得到EmptyMutex
  2. 在 Producer-1 将值写入缓冲槽之前,Producer-2 会被调度并且它会执行value++。现在值 = 2。Producer-2 会得到Empty,但它必须等待,Mutex因为它被 Producer-1 锁定。
  3. 现在 Producer-1 再次被安排。因为value=2,此时,它会将相同的内容写入缓冲区。写入后,它同时解锁EmptyMutex
  4. 在此之后,Producer-2 被安排。它在MutexProducer-1 解锁时得到。仍然value=2如此,它会将相同的内容写入缓冲区。
  5. 请注意,如果在 Producer-2 写入之前安排了另一个 Producer 线程,则value可能仍会被更改。由于值是在外部更新的Mutex,因此它可以被更改的次数没有限制/数字被跳过。

那么,这个问题的解决方案是什么?

将代码更新值(或任何通用的共享资源)移到producer(). 在您的代码中,您不必先做value++,如果插槽不可用,请做value--。最好的解决方案是在写入缓冲区时更新值,如下所示。

    //value++; //Don't update it here. Move it to critical section of the code, inside mutex.

    sem_wait(&Empty);
    sem_wait(&Mutex);

    if (SlotOccupationBoolArray[inputSlot] == 0)
    {
        Buffer[inputSlot] = ++value; //Update and assign value, Read about pre-increment
        SlotOccupationBoolArray[inputSlot] = 1;
    }
    //else //This section is not required.
    //{
    //    value--;
    //}

除此之外,还有一个问题。sem_postforFull应该在 Producer 写入缓冲区中的插槽时发生。同样,当消费者从缓冲区中的插槽读取时,应该会发生sem_postfor 。Empty

但是你sem_post在每个循环中都在做。会引起一些问题。所以也要注意这个。仅当读/写发生时才为sem_post/Full做。Empty

PS:互斥量和信号量是不同的概念。根据用例适当地使用它们对于良好、有效和更具可读性的编程至关重要。


推荐阅读