首页 > 解决方案 > 是什么导致服务器一遍又一遍地读取客户端的最后一条消息?

问题描述

鉴于服务器的代码,job_read 函数中出现了一个真正无法解释的错误。

如果此服务器正在与客户端连接,一切似乎都可以正常工作 - 尽管可能存在一些其他错误。如果客户端连接,将启动一个新线程,然后读取客户端的输入。如果客户端发送消息,则此消息将传播给所有其他客户端。客户端通过终端运行。如果您使用 STRG + C 停止客户端,一切都会正常工作,但如果您在退出终端时崩溃,则不会。

#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdbool.h>

#define BUFLEN 255
#define MAX_CONNECTIONS 128
#define MAX_NAME_LENGTH 30

struct JOB_INFO {
    int socket_ids[MAX_CONNECTIONS];
    bool end_of_program;
    int open_cnncts;
    pthread_mutex_t socketids_changingMutex;
};

struct SERVER_INFO {
    struct sockaddr_in serv_add;
    struct sockaddr_in cli_adr;
    socklen_t cli_len;
    pthread_t write_thread;
};
//Global Variables
FILE* plog_file;

void delete_socket(int socketid, struct JOB_INFO* pjob_data);
void* job_read(void*);
void* job_write(void*);
void error(const char* msg);
void setUpFileStream(int argc, char* argv[]);
int setUpConnections(struct SERVER_INFO* pserver_data, char* port_numc);
void enterConnectingLoop(struct SERVER_INFO* pserver_data, struct JOB_INFO* pjob_data, int sockfd);

void error(const char* msg){
    perror(msg);
    exit(1);
}

int main(int argc, char* argv[]) {
    setUpFileStream(argc, argv);

    int sock_fd;
    struct JOB_INFO job_data;
    job_data.end_of_program = false;
    job_data.open_cnncts = 0;

    struct SERVER_INFO server_data;
    //Create mutex
    if(pthread_mutex_init(&(job_data.socketids_changingMutex), NULL) < 0){
        error("Could not initialize Mutex");
    }
    //Initialzing threads and create write_thread
    pthread_create(&server_data.write_thread, NULL, job_write, (void*)&job_data);

    //Setup for connections
    sock_fd = setUpConnections(&server_data, argv[1]);
    fprintf(plog_file,"Listening...");
    fflush(stdout); //For unknown reason this is needed.
    enterConnectingLoop(&server_data, &job_data, sock_fd);

    job_data.end_of_program = true;
    close(sock_fd);

    pthread_join(server_data.write_thread, NULL);
    pthread_mutex_destroy(&job_data.socketids_changingMutex);
    return EXIT_SUCCESS;
}

void* job_read(void * p){

    char** pc = (char**)p; //allow pointer arithmetic
    int new_sock_fd = *((int*) (pc[1]));  //Casting
    struct JOB_INFO* pjob_data = ((struct JOB_INFO*) (pc[0])); //Casting

    ssize_t n; //Error catching variable
    ssize_t m; //Error catching variable
    char buffer[BUFLEN];
    char name[MAX_NAME_LENGTH];
    sprintf(name, "Client %d: ", new_sock_fd);
    fprintf(plog_file, "Reading from %s\n", name);

    while(!pjob_data->end_of_program){
        n = read(new_sock_fd, buffer, BUFLEN-1);
        if ( n > 0 ) buffer[n] = '\0';  // add a \0 on success
        if(n<0){
            printf("Buffer: %s", buffer);
            error("Reading Failed");
        }
        pthread_mutex_lock(&pjob_data->socketids_changingMutex);
        if(n == 0){
            delete_socket(new_sock_fd, pjob_data);
            pthread_exit(NULL);
        }

        for(int i = 0; i < pjob_data->open_cnncts; i++){
            if(pjob_data->socket_ids[i] == new_sock_fd){
                continue;
            }
            m = write(pjob_data->socket_ids[i], name, strlen(name));
            if((m < 0)){
                error("Writing name failed");
            }
            n = write(pjob_data->socket_ids[i], buffer, strlen(buffer));
            if((n < 0)){
                error("Writing message failed");
            }
        }
        pthread_mutex_unlock(&pjob_data->socketids_changingMutex);
        printf("%s%s", name, buffer);
        fflush(stdout); //For unknown reason this is needed.
    }
    delete_socket(new_sock_fd, pjob_data);
    free(p);
    pthread_exit( NULL );
}

void* job_write(void* args){
    struct JOB_INFO* pjob_data = (struct JOB_INFO*)(args);
    fprintf(plog_file, "Started writing thread...\n");
    ssize_t n; //Error catching variable
    ssize_t m; //Error catching variable
    char buffer[BUFLEN];
    char* name = "Server: \0";

    while(!pjob_data->end_of_program) {
        fgets(buffer, BUFLEN, stdin);

        pthread_mutex_lock(&pjob_data->socketids_changingMutex);
        for(int i = 0; i < pjob_data->open_cnncts; i++){
            m = write(pjob_data->socket_ids[i], name, strlen(name));
            if((m < 0)){
                error("Writing name failed");
            }
            n = write(pjob_data->socket_ids[i], buffer, strlen(buffer));
            if((n < 0)){
                error("Writing message failed");
            }
        }
        pthread_mutex_unlock(&pjob_data->socketids_changingMutex);
        if(strcmp("Bye\n", buffer) == 0){
            exit(EXIT_SUCCESS);
        }
    }
    free(args);
    pjob_data->end_of_program = true;
    pthread_exit( NULL );
}

void enterConnectingLoop(struct SERVER_INFO* pserver_data, struct JOB_INFO* pjob_data, int sockfd){
    listen(sockfd, MAX_CONNECTIONS);

    for(pjob_data->open_cnncts = 0; (!pjob_data->end_of_program); /*mutex needs to be set*/ ){
        void** p = malloc(2*sizeof(void*));
        p[0] = (void*)pjob_data;
        p[1] = (void*)&(pjob_data->socket_ids[pjob_data->open_cnncts]);
        pjob_data->socket_ids[pjob_data->open_cnncts] =
                accept(sockfd, (struct sockaddr*) &pserver_data->cli_adr, &pserver_data->cli_len);
        pthread_mutex_lock(&(pjob_data->socketids_changingMutex)); //Note you cannot mutex the accept()
        printf("SOCKETFD %d\n", (pjob_data->socket_ids[pjob_data->open_cnncts]));
        fprintf(plog_file,"Client connected.\n");
        pthread_t thread;
        pthread_create(&thread , NULL, job_read, p);
        pjob_data->open_cnncts++;
        pthread_mutex_unlock(&(pjob_data->socketids_changingMutex));
    }
}

void delete_socket(int socketid, struct JOB_INFO* pjob_data){
    bool found = false;
    for(int i = 0; i < pjob_data->open_cnncts; i++){
        if(found){
            pjob_data->socket_ids[i-1] = pjob_data->socket_ids[i];
        }
        if(pjob_data->socket_ids[i] == socketid){
            close(socketid);
            found = true;
        }
    }
    if(found){
        pjob_data->open_cnncts--;
    }
}

inline void setUpFileStream(int argc, char* argv[]){
    printf("Server started...\n");
    if(argc < 2){
        fprintf(stderr, "You must provide a port number");
        exit(EXIT_FAILURE);
    }
    if(argc == 3){
        plog_file = fopen(argv[2], "w");
    } else {
        plog_file = fopen("logfile.txt", "w");
    }
    plog_file = stdout;
    stderr = plog_file;
}

inline int setUpConnections(struct SERVER_INFO* pserver_data, char* port_numc){
    pserver_data->cli_len = sizeof(pserver_data->cli_adr);
    memset(&pserver_data->serv_add, 0, sizeof((pserver_data->serv_add)));

    uint16_t port_num = (uint16_t)atoi(port_numc);
    pserver_data->serv_add.sin_family = AF_INET;
    pserver_data->serv_add.sin_addr.s_addr = INADDR_ANY;
    pserver_data->serv_add.sin_port = htons(port_num);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
        error("Error opening socket.");
    }

    if(bind(sockfd, (struct sockaddr*) (&pserver_data->serv_add), sizeof(pserver_data->serv_add)) < 0){
        error("Binding failed.");
    }
    return sockfd;
}", buffer) == 0){
            exit(EXIT_SUCCESS);
        }
    }
    free(args);
    pjob_data->end_of_program = true;
    pthread_exit( NULL );
}

void enterConnectingLoop(struct SERVER_INFO* pserver_data, struct JOB_INFO* pjob_data, int sockfd){
    listen(sockfd, MAX_CONNECTIONS);

    for(pjob_data->open_cnncts = 0; (!pjob_data->end_of_program); /*mutex needs to be set*/ ){
        void** p = malloc(2*sizeof(void*));
        p[0] = (void*)pjob_data;
        p[1] = (void*)&(pjob_data->socket_ids[pjob_data->open_cnncts]);
        pjob_data->socket_ids[pjob_data->open_cnncts] =
                accept(sockfd, (struct sockaddr*) &pserver_data->cli_adr, &pserver_data->cli_len);
        pthread_mutex_lock(&(pjob_data->socketids_changingMutex)); //Note you cannot mutex the accept()
        printf("SOCKETFD %d\n", (pjob_data->socket_ids[pjob_data->open_cnncts]));
        fprintf(plog_file,"Client connected.\n");
        pthread_t thread;
        pthread_create(&thread , NULL, job_read, p);
        pjob_data->open_cnncts++;
        pthread_mutex_unlock(&(pjob_data->socketids_changingMutex));
    }
}

void delete_socket(int socketid, struct JOB_INFO* pjob_data){
    bool found = false;
    for(int i = 0; i < pjob_data->open_cnncts; i++){
        if(found){
            pjob_data->socket_ids[i-1] = pjob_data->socket_ids[i];
        }
        if(pjob_data->socket_ids[i] == socketid){
            close(socketid);
            found = true;
        }
    }
    if(found){
        pjob_data->open_cnncts--;
    }
}

inline void setUpFileStream(int argc, char* argv[]){
    printf("Server started...\n");
    if(argc < 2){
        fprintf(stderr, "You must provide a port number");
        exit(EXIT_FAILURE);
    }
    if(argc == 3){
        plog_file = fopen(argv[2], "w");
    } else {
        plog_file = fopen("logfile.txt", "w");
    }
    plog_file = stdout;
    stderr = plog_file;
}

inline int setUpConnections(struct SERVER_INFO* pserver_data, char* port_numc){
    pserver_data->cli_len = sizeof(pserver_data->cli_adr);
    memset(&pserver_data->serv_add, 0, sizeof((pserver_data->serv_add)));

    uint16_t port_num = (uint16_t)atoi(port_numc);
    pserver_data->serv_add.sin_family = AF_INET;
    pserver_data->serv_add.sin_addr.s_addr = INADDR_ANY;
    pserver_data->serv_add.sin_port = htons(port_num);

    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
        error("Error opening socket.");
    }

    if(bind(sockfd, (struct sockaddr*) (&pserver_data->serv_add), sizeof(pserver_data->serv_add)) < 0){
        error("Binding failed.");
    }
    return sockfd;
}

在作业功能中,服务器似乎一遍又一遍地打印客户端的最后一条消息。

void* job_read(void * p){

    char** pc = (char**)p; //allow pointer arithmetic
    int new_sock_fd = *((int*) (pc[1]));  //Casting
    struct JOB_INFO* pjob_data = ((struct JOB_INFO*) (pc[0])); //Casting

    ssize_t n; //Error catching variable
    ssize_t m; //Error catching variable
    char buffer[BUFLEN];
    char name[MAX_NAME_LENGTH];
    sprintf(name, "Client %d: ", new_sock_fd);
    fprintf(plog_file, "Reading from %s\n", name);

    while(!pjob_data->end_of_program){
        n = read(new_sock_fd, buffer, BUFLEN-1);
        if ( n > 0 ) buffer[n] = '\0';  // add a \0 on success
        if(n<0){
            printf("Buffer: %s", buffer);
            error("Reading Failed");
        }
        pthread_mutex_lock(&pjob_data->socketids_changingMutex);
        if(n == 0){
            delete_socket(new_sock_fd, pjob_data);
            pthread_exit(NULL);
        }

        for(int i = 0; i < pjob_data->open_cnncts; i++){
            if(pjob_data->socket_ids[i] == new_sock_fd){
                continue;
            }
            m = write(pjob_data->socket_ids[i], name, strlen(name));
            if((m < 0)){
                error("Writing name failed");
            }
            n = write(pjob_data->socket_ids[i], buffer, strlen(buffer));
            if((n < 0)){
                error("Writing message failed");
            }
        }
        pthread_mutex_unlock(&pjob_data->socketids_changingMutex);
        printf("%s%s", name, buffer);
        fflush(stdout); //For unknown reason this is needed.
    }
    delete_socket(new_sock_fd, pjob_data);
    free(p);
    pthread_exit( NULL );
}

有两件事。在这个函数中,name 变量似乎不时消失。她没有被打印或发送。但据我所知, name 变量总是被初始化的。

重现错误:

编译服务器和客户端(下面的代码)。启动服务器并打开一个端口:

./Server 9999

启动客户端:

./Client 127.0.0.1 9999

在客户端中输入一些内容。通过关闭终端退出客户端。服务器出现故障。

谢谢!

客户代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <pthread.h>
#include <stdbool.h>

#define BUFLEN 255

bool endprogram = false;
int sockfd;

void error(const char* msg){
    perror(msg);
    exit(1);
}

void* job_read(void* p){
    char buffer[BUFLEN];
    while(!endprogram){
        bzero(buffer, BUFLEN);
        ssize_t n = read(sockfd, buffer, (BUFLEN));
        if(n < 0){
            error("Error on reading");
        }
        printf("%s", buffer);
        int i = strncmp("Bye", buffer, 3);
        if(i == 0){
            endprogram = true;
            pthread_exit(NULL);
        }
    }
   pthread_exit(NULL);
}

int main(int argc, const char * argv[]) {
    pthread_t readt;

    int16_t portnum;
    struct sockaddr_in serveraddr;
    struct hostent* server;

    if(argc < 3){
        perror("You shall provide a port and a ip adress");
    }
    portnum = atoi(argv[2]);
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
        error("Error opening socket");
    }

    server = gethostbyname(argv[1]);
    if(!server){
        error("No such host");
    }

    bzero((char*)&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    bcopy((char *)server->h_addr_list[0], (char *)&serveraddr.sin_addr.s_addr, sizeof(server->h_length));
    serveraddr.sin_port = htons(portnum);

    if(connect(sockfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr))<0){
        error("Connection failed");
    }

    pthread_create(&readt, NULL, &job_read, NULL);

    ssize_t n;
    char buffer[BUFLEN];
    while(!endprogram){
        fgets(buffer, BUFLEN, stdin);
        n = write(sockfd, buffer, strlen(buffer));
        if(n < 0){
            error("Error on writing");
        }
        n = strcmp(buffer, "Bye");
        if(n == 0){
            endprogram = true;
        }
    }
    pthread_join(readt, NULL);
    close(sockfd);
    return 0;
}

编辑:

真的很难减少问题,因为我需要所有这些方法来解决问题或维护核心功能。但是,这是第一个减少(不可编译),可能存在错误。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdbool.h>

#define BUFLEN 255
#define MAX_CONNECTIONS 128
#define MAX_NAME_LENGTH 30

struct JOB_INFO {
    int socket_ids[MAX_CONNECTIONS];
    bool end_of_program;
    int open_cnncts;
    pthread_mutex_t socketids_changingMutex;
};

struct SERVER_INFO {
    struct sockaddr_in serv_add;
    struct sockaddr_in cli_adr;
    socklen_t cli_len;
    pthread_t write_thread;
};
//Global Variables
FILE* plog_file;

void delete_socket(int socketid, struct JOB_INFO* pjob_data);
void* job_read(void*);
void error(const char* msg);
void enterConnectingLoop(struct SERVER_INFO* pserver_data, struct JOB_INFO* pjob_data, int sockfd);

int main(int argc, char* argv[]) {

    int sock_fd;
    struct JOB_INFO job_data;
    job_data.end_of_program = false;
    job_data.open_cnncts = 0;

    struct SERVER_INFO server_data;
    enterConnectingLoop(&server_data, &job_data, sock_fd);

    job_data.end_of_program = true;
    close(sock_fd);

    pthread_join(server_data.write_thread, NULL);
    pthread_mutex_destroy(&job_data.socketids_changingMutex);
    return EXIT_SUCCESS;
}

void* job_read(void * p){

    char** pc = (char**)p; //allow pointer arithmetic
    int new_sock_fd = *((int*) (pc[1]));  //Casting
    struct JOB_INFO* pjob_data = ((struct JOB_INFO*) (pc[0])); //Casting

    ssize_t n; //Error catching variable
    ssize_t m; //Error catching variable
    char buffer[BUFLEN];
    char name[MAX_NAME_LENGTH];
    sprintf(name, "Client %d: ", new_sock_fd);
    fprintf(plog_file, "Reading from %s\n", name);

    while(!pjob_data->end_of_program){
        n = read(new_sock_fd, buffer, BUFLEN-1);
        if ( n > 0 ) buffer[n] = '\0';  // add a \0 on success
        if(n<0){
            printf("Buffer: %s", buffer);
            error("Reading Failed");
        }
        pthread_mutex_lock(&pjob_data->socketids_changingMutex);
        if(n == 0){
            delete_socket(new_sock_fd, pjob_data);
            pthread_exit(NULL);
        }

        for(int i = 0; i < pjob_data->open_cnncts; i++){
            if(pjob_data->socket_ids[i] == new_sock_fd){
                continue;
            }
            m = write(pjob_data->socket_ids[i], name, strlen(name));
            if((m < 0)){
                error("Writing name failed");
            }
            n = write(pjob_data->socket_ids[i], buffer, strlen(buffer));
            if((n < 0)){
                error("Writing message failed");
            }
        }
        printf("%s%s", name, buffer);
        pthread_mutex_unlock(&pjob_data->socketids_changingMutex);
        fflush(stdout); //For unknown reason this is needed.
    }
    delete_socket(new_sock_fd, pjob_data);
    free(p);
    pthread_exit( NULL );
}

void enterConnectingLoop(struct SERVER_INFO* pserver_data, struct JOB_INFO* pjob_data, int sockfd){
    listen(sockfd, MAX_CONNECTIONS);

    for(pjob_data->open_cnncts = 0; (!pjob_data->end_of_program); /*mutex needs to be set*/ ){
        void** p = malloc(2*sizeof(void*));
        p[0] = (void*)pjob_data;
        p[1] = (void*)&(pjob_data->socket_ids[pjob_data->open_cnncts]);
        pjob_data->socket_ids[pjob_data->open_cnncts] =
                accept(sockfd, (struct sockaddr*) &pserver_data->cli_adr, &pserver_data->cli_len);
        pthread_mutex_lock(&(pjob_data->socketids_changingMutex)); //Note you cannot mutex the accept()
        printf("SOCKETFD %d\n", (pjob_data->socket_ids[pjob_data->open_cnncts]));
        fprintf(plog_file,"Client connected.\n");
        pthread_t thread;
        pthread_create(&thread , NULL, job_read, p);
        pjob_data->open_cnncts++;
        pthread_mutex_unlock(&(pjob_data->socketids_changingMutex));
    }
}

void delete_socket(int socketid, struct JOB_INFO* pjob_data){
    bool found = false;
    for(int i = 0; i < pjob_data->open_cnncts; i++){
        if(found){
            pjob_data->socket_ids[i-1] = pjob_data->socket_ids[i];
        }
        if(pjob_data->socket_ids[i] == socketid){
            close(socketid);
            found = true;
        }
    }
    if(found){
        pjob_data->open_cnncts--;
    }
}

我将尝试设置一个程序,该程序将使用输入和输出文件在 pc 上执行相同的操作,从而围绕整个 inet-setup-monster ...

编辑编辑:

如果在 IDE 之外发生同样的事情,我会得到一个核心转储 - 而不是愚蠢的输出:类似于:

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>

void* job_read(void* p){
    sleep(1000); //delay to ensure job_write is waiting
    FILE* pfile = ((FILE**)p)[0];
    while(1){
        char buffer[20];
        fgets(buffer, 19, pfile);
        printf("&s", buffer);
    }

}

void* job_write(void* p){
    FILE* pfile = ((FILE**)p)[0];
    fputs("hi\n", pfile);
    sleep(1200); //delay to ensure job_write is waiting
    pthread_exit(NULL); //FUCK THE OPEN CONNECTION
}

int main(int argc, char* argv[]) {
    (void)argv; //Compiler things
    (void)argc;
    printf("HERE");
    FILE* in = fopen("in.txt", "w");
    pthread_t firstread; //Simulate all threads in actual IO: Server
    pthread_t firstwrite; //Client

    FILE** p = malloc(sizeof(FILE*));
    p[0] = in;

    pthread_create(&firstwrite, NULL, job_write, (void*)&p);
    pthread_create(&firstread, NULL, job_read, (void*)&p);

    pthread_join(firstwrite, NULL);
}

我想这可能是我实际问题的解决方案。我的假设是我尝试编写一个不再存在的文件或类似的东西。但是,我想这是因为一个关闭的文件导致了这个错误

编辑 编辑 编辑

我可以肯定地说,问题是由 job_read 函数引起的。正如所评论的,一位聪明的用户意识到,如果终端退出,在实际进程被终止之前需要一些时间。这可能会导致错误之一。

标签: cmultithreadingunixserverposix

解决方案


stdin如果输入流关闭,此代码将一遍又一遍地重复读取的最后一行:

while(!pjob_data->end_of_program) {
    fgets(buffer, BUFLEN, stdin);

    pthread_mutex_lock(&pjob_data->socketids_changingMutex);
    for(int i = 0; i < pjob_data->open_cnncts; i++){
        m = write(pjob_data->socket_ids[i], name, strlen(name));
        if((m < 0)){
            error("Writing name failed");
        }
        n = write(pjob_data->socket_ids[i], buffer, strlen(buffer));
        if((n < 0)){
            error("Writing message failed");
        }
    }
    pthread_mutex_unlock(&pjob_data->socketids_changingMutex);
    if(strcmp("Bye\n", buffer) == 0){
        exit(EXIT_SUCCESS);
    }
}

你永远不会检查是否fgets()失败。当它这样做时,先前的内容buffer被写入套接字。然后循环一遍又一遍地重复,只要pjob_data->end_of_program不评估为真,buffer每次都发送内容。也可能很快。

编辑:

客户端代码中也存在同样的错误:

while(!endprogram){
    fgets(buffer, BUFLEN, stdin);
    n = write(sockfd, buffer, strlen(buffer));
    if(n < 0){
        error("Error on writing");
    }
    n = strcmp(buffer, "Bye");
    if(n == 0){
        endprogram = true;
    }
}

解决方法是检查返回值fgets(),如果NULL,则采取适当的行动。我还没有对代码进行足够的分析,无法知道“适当地”是什么意思。

pthread_exit()此代码在互斥锁锁定的情况下调用:

    pthread_mutex_lock(&pjob_data->socketids_changingMutex);
    if(n == 0){
        delete_socket(new_sock_fd, pjob_data);
        pthread_exit(NULL);
    }

在此之后任何锁定或解锁的尝试pjob_data->socketids_changingMutex都可能会静默失败或死锁。


推荐阅读