c - 具有共享队列 C 的生产者消费者
问题描述
我能够解决我的代码中的死锁,并且对于 1 个生产者和 1 个消费者来说,它应该可以正常工作。但是,如果我引入超过 1 个请求者线程,我似乎会将同一行的多个副本打印到 results.txt。事实上,它打印的同一行的副本数量与请求者线程的数量一样多,这很奇怪。我认为这归结为我的退出条件之一已关闭,或者我的解析器函数对于多个线程来说不是线程安全的。我检查了我的退出条件,对我来说似乎没问题。
我想我在同一个输入文件上运行多个请求者线程。我需要确保每个请求者线程一次只访问一个输入文件,
.h 文件
#ifndef MULTILOOKUP_H_
#define MULTILOOKUP_H_
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <sys/time.h>
#include "/home/user/Desktop/PA3/util.c"
#define MAX_REQUESTER_THREADS 10
#define MAX_RESOLVER_THREADS 5
#define MAX_INPUT_FILES 10
#define MAXSIZE 20
struct arguments {
struct queue *q;
char *input[MAX_INPUT_FILES];
char *resLog;
char *reqLog;
char line[100];
char result[100];
char ipv6[100];
int numInputFiles;
};
struct node {
char name[100];
struct node *next;
};
struct queue {
int size;
struct node *head;
struct node *tail;
};
void init(struct queue *);
void pop(struct queue *);
void push(struct queue *, char *);
void* requesterThread(void *);
void* resolverThread(void *);
pthread_mutex_t mutex;
pthread_cond_t empty, full;
#endif
.c 文件
#include "multilookup.h"
/*----------------------Struct Functions------------------------*/
void init(struct queue *q) {
q->head = NULL;
q->tail = NULL;
q->size = 0;
}
void pop(struct queue *q)
{
if (q->size > 0) {
q->size--;
struct node *tmp = q->head;
q->head = q->head->next;
free(tmp);
} else {
return;
}
}
void push(struct queue *q, char *name)
{
if (q->size < MAXSIZE) {
q->size++;
if (q->head == NULL) {
q->head = (struct node *)malloc(sizeof(struct node));
strcpy(q->head->name, name);
q->head->next == NULL;
q->tail = q->head;
} else {
q->tail->next = (struct node *)malloc(sizeof(struct node));
strcpy(q->tail->next->name, name);
q->tail->next->next = NULL;
q->tail = q->tail->next;
}
} else {
return;
}
}
/*--------------------------End of struct functions------------------*/
void* requesterThread(void *receivedStruct) {
struct arguments *args_ptr;
args_ptr = (struct arguments*) receivedStruct;
FILE *fptr;
FILE *sptr;
int fileCounter = 0;
if ((sptr = fopen(args_ptr->reqLog, "w")) == NULL) {
fprintf(stderr, "Error! Bogus output path.\n");
exit(1);
}
//Read from input file and push to shared queue
while (1) {
if ((fptr = fopen(args_ptr->input[fileCounter], "r")) == NULL) {
fprintf(stderr, "Error! Bogus input file path.\n");
exit(1);
}
//Go through each file one at a time
while (fscanf(fptr,"%[^\n]%*c", args_ptr->line) != EOF) {
pthread_mutex_lock(&mutex);
while (args_ptr->q->size == MAXSIZE) {
pthread_cond_wait(&empty, &mutex);
}
push(args_ptr->q, args_ptr->line);
pthread_cond_signal(&full);
pthread_mutex_unlock(&mutex);
fprintf(sptr, "%s \n", args_ptr->line);
fprintf(sptr, "Thread %lu serviced %d file(s)\n", pthread_self(), fileCounter + 1);
}
fileCounter++;
//Condition to send poisonPill to consumer
if(fileCounter == args_ptr->numInputFiles) {
push(args_ptr->q, NULL);
break;
}
}
fclose(fptr);
fclose(sptr);
return 0;
}
void* resolverThread(void *receivedStruct) {
struct arguments *args_ptr;
args_ptr = (struct arguments *) receivedStruct;
FILE *rptr;
if( (rptr = fopen(args_ptr->resLog, "w")) == NULL) {
fprintf(stderr, "Error! Bogus output file path.\n");
exit(1);
}
while(1) {
pthread_mutex_lock(&mutex);
while (args_ptr->q->size == 0) {
pthread_cond_wait(&full, &mutex);
}
strcpy(args_ptr->result, args_ptr->q->head->name);
pop(args_ptr->q);
pthread_cond_signal(&empty);
pthread_mutex_unlock(&mutex);
if (args_ptr->result != NULL && args_ptr->q->size > 0) {
if (dnslookup(args_ptr->result, args_ptr->ipv6, INET6_ADDRSTRLEN) == -1){
fprintf(stderr, "Bogus hostname.\n");
fprintf(rptr, "%s, \n", args_ptr->result);
} else {
fprintf(rptr, "%s, %s\n", args_ptr->result, args_ptr->ipv6);
}
} else {break;}
}
fclose(rptr);
return 0;
}
int main(/*int argc, char *argv[]*/) {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&empty, NULL);
pthread_cond_init(&full, NULL);
int num_req_threads;
int num_res_threads;
int rc1;
int rc2;
char *inputFiles[] = {"/home/user/Desktop/PA3/input/names1.txt", "/home/user/Desktop/PA3/input/names2.txt"}; //argv[5];
//instance of shared queue struct
struct queue q;
init(&q);
//instance of arguments struct
struct arguments args;
args.q = &q;
for (int i = 0; i < MAXINPUT; i++) {
args.input[i] = inputFiles[i];//argv[5];
}
args.reqLog = "/home/user/Desktop/PA3/serviced.txt";//argv[3];
args.resLog = "/home/user/Desktop/PA3/results.txt"; //argv[4];
args.numInputFiles = 2;
num_req_threads = 1;
num_res_threads = 1;
if (num_req_threads > MAX_REQUESTER_THREADS) {
printf("Cannot have more than 10 requester threads!\n"); exit(1);}
if (num_res_threads > MAX_RESOLVER_THREADS) {
printf("Cannot have more than 5 requester threads!\n"); exit(1);}
//Thread Creation
pthread_t reqThreads[num_req_threads];
pthread_t resThreads[num_res_threads];
for(int i = 0; i < num_req_threads; i++) {
rc1 = pthread_create(&reqThreads[i], NULL, requesterThread, (void *)&args);
printf("Requester thread %d created.\n", i);
}
for (int j = 0; j < num_res_threads; j++) {
rc2 = pthread_create(&resThreads[j], NULL, resolverThread, (void *) &args);
printf("Resolver thread %d created.\n", j);
}
if(rc1 || rc2) {
printf("Could not create threads.\n");
exit(-1);
}
for (int m = 0; m < num_req_threads; m++) {
pthread_join(reqThreads[m], 0);
}
for (int n = 0; n < num_res_threads; n++) {
pthread_join(resThreads[n], 0);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&empty);
pthread_cond_destroy(&full);
return 0;
}