首页 > 解决方案 > 具有共享队列 C 的生产者消费者

问题描述

我能够解决我的代码中的死锁,并且对于 1 个生产者和 1 个消费者来说,它应该可以正常工作。但是,如果我引入超过 1 个请求者线程,我似乎会将同一行的多个副本打印到 results.txt。事实上,它打印的同一行的副本数量与请求者线程的数量一样多,这很奇怪。我认为这归结为我的退出条件之一已关闭,或者我的解析器函数对于多个线程来说不是线程安全的。我检查了我的退出条件,对我来说似乎没问题。

我想我在同一个输入文件上运行多个请求者线程。我需要确保每个请求者线程一次只访问一个输入文件,

.h 文件

#ifndef MULTILOOKUP_H_
#define MULTILOOKUP_H_

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <sys/time.h>
#include "/home/user/Desktop/PA3/util.c"

#define MAX_REQUESTER_THREADS 10
#define MAX_RESOLVER_THREADS 5
#define MAX_INPUT_FILES 10
#define MAXSIZE 20

struct arguments {
        struct queue *q;
        char *input[MAX_INPUT_FILES];
        char *resLog;
        char *reqLog;
        char line[100];
        char result[100];
        char ipv6[100];
        int numInputFiles;
};

struct node {
    char name[100];
    struct node *next;
};

struct queue {
    int size;
    struct node *head;
    struct node *tail;
};

void init(struct queue *);
void pop(struct queue *);
void push(struct queue *, char *);
void* requesterThread(void *);
void* resolverThread(void *);


pthread_mutex_t mutex;
pthread_cond_t empty, full;


#endif

.c 文件

#include "multilookup.h"

/*----------------------Struct Functions------------------------*/
void init(struct queue *q) {
q->head = NULL;
q->tail = NULL;
q->size = 0;

}

void pop(struct queue *q)
{
if (q->size > 0) {
    q->size--;
    struct node *tmp = q->head;
    q->head = q->head->next;
    free(tmp);  
} else {
    return;
}    
}


void push(struct queue *q, char *name)
{
if (q->size < MAXSIZE) {
    q->size++;
    if (q->head == NULL) {
        q->head = (struct node *)malloc(sizeof(struct node));
        strcpy(q->head->name, name);
        q->head->next == NULL;
        q->tail = q->head;
    } else {
        q->tail->next = (struct node *)malloc(sizeof(struct node));
        strcpy(q->tail->next->name, name);
        q->tail->next->next = NULL;
        q->tail = q->tail->next;
    }
} else {
    return;
}   
}
/*--------------------------End of struct functions------------------*/

void* requesterThread(void *receivedStruct) {
    struct arguments *args_ptr;
    args_ptr = (struct arguments*) receivedStruct;

    FILE *fptr;
    FILE *sptr;
    int fileCounter = 0;

    if ((sptr = fopen(args_ptr->reqLog, "w")) == NULL) {
        fprintf(stderr, "Error! Bogus output path.\n");
        exit(1);
    }

        //Read from input file and push to shared queue
    while (1) {
        if ((fptr = fopen(args_ptr->input[fileCounter], "r")) == NULL) {
            fprintf(stderr, "Error! Bogus input file path.\n");
            exit(1);
        }
        //Go through each file one at a time
        while (fscanf(fptr,"%[^\n]%*c", args_ptr->line) != EOF) {
                       
            pthread_mutex_lock(&mutex);
            while (args_ptr->q->size == MAXSIZE) {
                pthread_cond_wait(&empty, &mutex);
            }

            push(args_ptr->q, args_ptr->line);

            pthread_cond_signal(&full);
            pthread_mutex_unlock(&mutex);

            fprintf(sptr, "%s \n", args_ptr->line);

            fprintf(sptr, "Thread %lu serviced %d file(s)\n", pthread_self(), fileCounter + 1);
    }

    fileCounter++;
    //Condition to send poisonPill to consumer
    if(fileCounter == args_ptr->numInputFiles) {
        push(args_ptr->q, NULL);
        break;
    }

}
fclose(fptr);
fclose(sptr);

return 0;
}


void* resolverThread(void *receivedStruct) {
        struct arguments *args_ptr;
        args_ptr = (struct arguments *) receivedStruct;

        FILE *rptr;

        if( (rptr = fopen(args_ptr->resLog, "w")) == NULL) {
               fprintf(stderr, "Error! Bogus output file path.\n");
               exit(1);
        }

        while(1) {
             pthread_mutex_lock(&mutex);
             while (args_ptr->q->size == 0) {
                 pthread_cond_wait(&full, &mutex);
             }

             strcpy(args_ptr->result, args_ptr->q->head->name);
             pop(args_ptr->q);

             pthread_cond_signal(&empty);
             pthread_mutex_unlock(&mutex);


             if (args_ptr->result != NULL && args_ptr->q->size > 0) {
                    if (dnslookup(args_ptr->result, args_ptr->ipv6, INET6_ADDRSTRLEN) == -1){
                         fprintf(stderr, "Bogus hostname.\n");
                         fprintf(rptr, "%s, \n", args_ptr->result);
                     } else {
                         fprintf(rptr, "%s, %s\n", args_ptr->result, args_ptr->ipv6);
                     }
             } else {break;}
        }
 fclose(rptr);
 return 0;
}


int main(/*int argc, char *argv[]*/) {

pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&empty, NULL);
pthread_cond_init(&full, NULL);

int num_req_threads;
int num_res_threads;
int rc1;
int rc2;

char *inputFiles[] = {"/home/user/Desktop/PA3/input/names1.txt", "/home/user/Desktop/PA3/input/names2.txt"}; //argv[5];

//instance of shared queue struct
struct queue q;
init(&q);

//instance of arguments struct
struct arguments args;

args.q = &q;
for (int i = 0; i < MAXINPUT; i++) {
    args.input[i] = inputFiles[i];//argv[5];
}
args.reqLog = "/home/user/Desktop/PA3/serviced.txt";//argv[3];
args.resLog = "/home/user/Desktop/PA3/results.txt"; //argv[4];
args.numInputFiles = 2;

num_req_threads = 1;
num_res_threads = 1;

if (num_req_threads > MAX_REQUESTER_THREADS) {
    printf("Cannot have more than 10 requester threads!\n"); exit(1);}
if (num_res_threads > MAX_RESOLVER_THREADS) {
    printf("Cannot have more than 5 requester threads!\n"); exit(1);}

//Thread Creation
pthread_t reqThreads[num_req_threads];
pthread_t resThreads[num_res_threads];

for(int i = 0; i < num_req_threads; i++) {
    rc1 = pthread_create(&reqThreads[i], NULL, requesterThread, (void *)&args);
    printf("Requester thread %d created.\n", i);
}
for (int j = 0; j < num_res_threads; j++) {
    rc2 = pthread_create(&resThreads[j], NULL, resolverThread, (void *) &args);
    printf("Resolver thread %d created.\n", j);
}
if(rc1 || rc2) {
    printf("Could not create threads.\n");
    exit(-1);
}
for (int m = 0; m < num_req_threads; m++) {
    pthread_join(reqThreads[m], 0);
}
for (int n = 0; n < num_res_threads; n++) {
    pthread_join(resThreads[n], 0);
}

pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&empty);
pthread_cond_destroy(&full);
return 0;
}

标签: cmultithreading

解决方案


推荐阅读