c - 在所有生产者结束后如何正确终止多个消费者?
问题描述
一段时间后,我一直在寻找解决方案。我一直在一个最小的例子中工作,我有一个生产者和多个消费者,它似乎在大多数情况下都能正常工作(生产者和消费者没有死锁,所有的值都被消费一次,等等。 .)
但是,我不确定当生产者结束消费所有剩余的生产数据然后优雅地结束时如何向消费者发出信号。这是我准备的一个最小的工作示例。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <semaphore.h>
#include <pthread.h>
#include <stdbool.h>
#define bufferSize 6
#define consumerAmount 3
int buffer[bufferSize];
int producerCell = 0, consumerCell = 0;
sem_t mutexFill, mutexEmpty;
sem_t bufferHasSpace, bufferHasData;
bool producerAlive = true;
void *producer_routine()
{
int producedCell;
for (int i = 0; i < 20; i++)
{
sem_wait(&bufferHasSpace);
sem_wait(&mutexFill);
producedCell = producerCell;
producerCell = (producerCell + 1) % bufferSize;
sem_post(&mutexFill);
memcpy(&buffer[producedCell], &i, sizeof(i));
printf("Producer generated %i\n", i);
sem_post(&bufferHasData);
}
printf("Producer ended\n");
producerAlive = false;
}
void *consumer_routine(void *id)
{
int consumedCell, semaphoreValue;
do
{
sem_wait(&bufferHasData);
sem_wait(&mutexEmpty);
consumedCell = consumerCell;
consumerCell = (consumerCell + 1) % bufferSize;
sem_getvalue(&bufferHasData, &semaphoreValue);
sem_post(&mutexEmpty);
printf("Consumer %i processed %i\n", *(int *)id, buffer[consumedCell]);
sem_post(&bufferHasSpace);
}while (producerAlive || semaphoreValue > 0);
printf("Consumer %i ended\n", *(int *)id);
}
int main()
{
sem_init(&mutexFill, 1, 1);
sem_init(&mutexEmpty, 1, 1);
sem_init(&bufferHasSpace, 1, bufferSize);
sem_init(&bufferHasData, 1, 0);
pthread_t consumers[consumerAmount];
int consumerIDs[consumerAmount];
for (int i = 0; i < consumerAmount; i++)
{
consumerIDs[i] = i;
pthread_create(&consumers[i], NULL, &consumer_routine, &consumerIDs[i]);
}
pthread_t producer;
pthread_create(&producer, NULL, &producer_routine, NULL);
for (int i = 0; i < consumerAmount; i++)
{
pthread_join(consumers[i], NULL);
}
return 0;
}
代码片段的示例输出:
Producer generated 0
Producer generated 1
Consumer 0 processed 0
Consumer 1 processed 1
Producer generated 2
Producer generated 3
Consumer 2 processed 2
Consumer 2 processed 3
Producer generated 4
Producer generated 5
Producer generated 6
Consumer 1 processed 4
Consumer 1 processed 6
Producer generated 7
Consumer 0 processed 5
Producer generated 8
Consumer 1 processed 7
Producer generated 9
Producer generated 10
Consumer 0 processed 9
Consumer 0 processed 10
Consumer 2 processed 8
Producer generated 11
Producer generated 12
Producer generated 13
Producer generated 14
Producer generated 15
Producer generated 16
Consumer 2 processed 11
Consumer 2 processed 12
Consumer 2 processed 13
Consumer 2 processed 14
Consumer 2 processed 16
Consumer 0 processed 15
Producer generated 17
Producer generated 18
Producer generated 19
Consumer 1 processed 17
Consumer 2 processed 18
Producer ended
Consumer 0 processed 19
Consumer 0 ended
(It gets stuck here indefinitely until the process is killed)
我尝试了以下方法,试图让消费者在生产者结束然后退出后从缓冲区中读取剩余数据,但它们似乎都没有给我预期的结果。
- 不使用布尔标志并依赖于 bufferHasData 信号量:每个消费者消耗一次。之后,由于缓冲区为空,因此它退出。
- 将布尔标志与 bufferHasData 信号量结合使用(上面的示例实现了这一点):现在它可以正常工作,但只有第一个使用者可以优雅地退出。其余的消费者陷入等待更多数据的信号量中,这不会发生,因为生产者已经终止。
- 仅使用布尔标志:一旦生产者结束,消费者将直接终止而不处理缓冲区的其余部分。
- 不使用任何东西并让消费者无限期地运行:它消耗了所有数据,但现在我无法告诉消费者停止消费并优雅地退出。
那么,告诉消费者生产者已经完成的正确方法是什么,以及如何在多个消费者被锁定在信号量中的情况下消耗缓冲区的剩余部分?
解决方案
在所有生产者结束后如何正确终止多个消费者?
简单的解决方案是让生产者产生一个特殊的“终止你自己”值(每个消费者一个);消费者认识到这个值并终止自己而不是像往常一样处理它。
例如(使用INT_MAX
的是“终止你自己”的值):
void *producer_routine()
{
int producedCell;
int terminator = INT_MAX;
for (int i = 0; i < 20; i++)
{
sem_wait(&bufferHasSpace);
sem_wait(&mutexFill);
producedCell = producerCell;
producerCell = (producerCell + 1) % bufferSize;
sem_post(&mutexFill);
memcpy(&buffer[producedCell], &i, sizeof(i));
printf("Producer generated %i\n", i);
sem_post(&bufferHasData);
}
printf("Producer ending\n");
for (int i = 0; i < consumerAmount; i++)
{
sem_wait(&bufferHasSpace);
sem_wait(&mutexFill);
producedCell = producerCell;
producerCell = (producerCell + 1) % bufferSize;
sem_post(&mutexFill);
memcpy(&buffer[producedCell], &terminator, sizeof(terminator));
sem_post(&bufferHasData);
}
printf("Producer ended\n");
}
void *consumer_routine(void *id)
{
int consumedCell, semaphoreValue, value;
do
{
sem_wait(&bufferHasData);
sem_wait(&mutexEmpty);
consumedCell = consumerCell;
consumerCell = (consumerCell + 1) % bufferSize;
sem_getvalue(&bufferHasData, &semaphoreValue);
sem_post(&mutexEmpty);
value = buffer[consumedCell];
sem_post(&bufferHasSpace);
if(value != INT_MAX) {
printf("Consumer %i processed %i\n", *(int *)id, buffer[consumedCell]);
} else {
printf("Consumer %i told to terminate\n", *(int *)id);
break;
}
}while (semaphoreValue > 0);
printf("Consumer %i ended\n", *(int *)id);
}
推荐阅读
- sql - 在子查询中合并 SQL 行
- javascript - 我需要写困难的回文
- python - 我的输出没有正确显示我是否正确调用了我的定义,我是否正确返回了这些定义的值?
- amazon-s3 - 在 Fargate 容器中,为什么我可以 CRUD S3 但无法创建预签名帖子
- sql-server - 无法从迁移中创建触发器
- python - 新消息到达时如何中断while循环?
- c# - Entity Framework Core Table-per-Hierarchy:防止将“未注册的类”添加为基本类型
- python - 受试者之间的分类出现计数
- arduino - 为什么我使用 Arduino 键盘输出错误?
- php - PHP/MVC/PDO - 数据库类之外的 beginTransaction