首页 > 解决方案 > 在所有生产者结束后如何正确终止多个消费者?

问题描述

一段时间后,我一直在寻找解决方案。我一直在一个最小的例子中工作,我有一个生产者和多个消费者,它似乎在大多数情况下都能正常工作(生产者和消费者没有死锁,所有的值都被消费一次,等等。 .)

但是,我不确定当生产者结束消费所有剩余的生产数据然后优雅地结束时如何向消费者发出信号。这是我准备的一个最小的工作示例。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>
#include <stdbool.h>
#define bufferSize 6
#define consumerAmount 3

int buffer[bufferSize];
int producerCell = 0, consumerCell = 0;
sem_t mutexFill, mutexEmpty;
sem_t bufferHasSpace, bufferHasData;
bool producerAlive = true;

void *producer_routine()
{
    int producedCell;
    for (int i = 0; i < 20; i++)
    {
        sem_wait(&bufferHasSpace);
        sem_wait(&mutexFill);
        producedCell = producerCell;
        producerCell = (producerCell + 1) % bufferSize;
        sem_post(&mutexFill);
        memcpy(&buffer[producedCell], &i, sizeof(i));
        printf("Producer generated %i\n", i);
        sem_post(&bufferHasData);
    }
    printf("Producer ended\n");
    producerAlive = false;
}

void *consumer_routine(void *id)
{
    int consumedCell, semaphoreValue;
    do
    {
        sem_wait(&bufferHasData);
        sem_wait(&mutexEmpty);
        consumedCell = consumerCell;
        consumerCell = (consumerCell + 1) % bufferSize;
        sem_getvalue(&bufferHasData, &semaphoreValue);
        sem_post(&mutexEmpty);
        printf("Consumer %i processed %i\n", *(int *)id, buffer[consumedCell]);
        sem_post(&bufferHasSpace);
    }while (producerAlive || semaphoreValue > 0);
    printf("Consumer %i ended\n", *(int *)id);

}

int main()
{
    sem_init(&mutexFill, 1, 1);
    sem_init(&mutexEmpty, 1, 1);
    sem_init(&bufferHasSpace, 1, bufferSize);
    sem_init(&bufferHasData, 1, 0);
    
    pthread_t consumers[consumerAmount];
    int consumerIDs[consumerAmount];
    for (int i = 0; i < consumerAmount; i++)
    {
        consumerIDs[i] = i;
        pthread_create(&consumers[i], NULL, &consumer_routine, &consumerIDs[i]);
    }
    
    pthread_t producer;
    pthread_create(&producer, NULL, &producer_routine, NULL);

    for (int i = 0; i < consumerAmount; i++)
    {
        pthread_join(consumers[i], NULL);
    }
    return 0;
}

代码片段的示例输出:

Producer generated 0
Producer generated 1
Consumer 0 processed 0
Consumer 1 processed 1
Producer generated 2
Producer generated 3
Consumer 2 processed 2
Consumer 2 processed 3
Producer generated 4
Producer generated 5
Producer generated 6
Consumer 1 processed 4
Consumer 1 processed 6
Producer generated 7
Consumer 0 processed 5
Producer generated 8
Consumer 1 processed 7
Producer generated 9
Producer generated 10
Consumer 0 processed 9
Consumer 0 processed 10
Consumer 2 processed 8
Producer generated 11
Producer generated 12
Producer generated 13
Producer generated 14
Producer generated 15
Producer generated 16
Consumer 2 processed 11
Consumer 2 processed 12
Consumer 2 processed 13
Consumer 2 processed 14
Consumer 2 processed 16
Consumer 0 processed 15
Producer generated 17
Producer generated 18
Producer generated 19
Consumer 1 processed 17
Consumer 2 processed 18
Producer ended
Consumer 0 processed 19
Consumer 0 ended
(It gets stuck here indefinitely until the process is killed)

我尝试了以下方法,试图让消费者在生产者结束然后退出后从缓冲区中读取剩余数据,但它们似乎都没有给我预期的结果。

那么,告诉消费者生产者已经完成的正确方法是什么,以及如何在多个消费者被锁定在信号量中的情况下消耗缓冲区的剩余部分?

标签: cmultithreadingpthreadssemaphoreproducer-consumer

解决方案


在所有生产者结束后如何正确终止多个消费者?

简单的解决方案是让生产者产生一个特殊的“终止你自己”值(每个消费者一个);消费者认识到这个值并终止自己而不是像往常一样处理它。

例如(使用INT_MAX的是“终止你自己”的值):

void *producer_routine()
{
    int producedCell;
    int terminator = INT_MAX;

    for (int i = 0; i < 20; i++)
    {
        sem_wait(&bufferHasSpace);
        sem_wait(&mutexFill);
        producedCell = producerCell;
        producerCell = (producerCell + 1) % bufferSize;
        sem_post(&mutexFill);
        memcpy(&buffer[producedCell], &i, sizeof(i));
        printf("Producer generated %i\n", i);
        sem_post(&bufferHasData);
    }

    printf("Producer ending\n");
    for (int i = 0; i < consumerAmount; i++)
    {
        sem_wait(&bufferHasSpace);
        sem_wait(&mutexFill);
        producedCell = producerCell;
        producerCell = (producerCell + 1) % bufferSize;
        sem_post(&mutexFill);
        memcpy(&buffer[producedCell], &terminator, sizeof(terminator));
        sem_post(&bufferHasData);
    }
    printf("Producer ended\n");
}

void *consumer_routine(void *id)
{
    int consumedCell, semaphoreValue, value;
    do
    {
        sem_wait(&bufferHasData);
        sem_wait(&mutexEmpty);
        consumedCell = consumerCell;
        consumerCell = (consumerCell + 1) % bufferSize;
        sem_getvalue(&bufferHasData, &semaphoreValue);
        sem_post(&mutexEmpty);
        value = buffer[consumedCell];
        sem_post(&bufferHasSpace);
        if(value != INT_MAX) {
            printf("Consumer %i processed %i\n", *(int *)id, buffer[consumedCell]);
        } else {
            printf("Consumer %i told to terminate\n", *(int *)id);
            break;
        }
    }while (semaphoreValue > 0);
    printf("Consumer %i ended\n", *(int *)id);
}

推荐阅读