首页 > 解决方案 > 在这个 mutex/pthread_cond_wait 结构中,我的数据会在哪里丢失?

问题描述

最终编辑:我选择的答案说明了问题的解决方案。代表性示例代码显示在此处的 diff 中

编辑:帖子底部的完整可编译代码。

我有这个简单的多线程服务器,它只接受一个连接,并且应该将文件描述符传递给一个线程,以允许该线程直接处理它,直到客户端断开连接。

出于某种原因,即使在服务器内部使用以下代码流,一些客户端也会“掉入裂缝”并陷入困境。(他们永远不会被服务器处理,所以他们只是在接受连接后挂起)

以下块是我的服务器主运行循环:

    while(g_serv.b_running)
    {
        //printf("Awaiting connection.\n");
        client_fd = accept(g_serv.serv_listener_fd,
                           (struct sockaddr*)&cli_addr,
                           &clilen);
        if (0 > client_fd)
        {
            fprintf(stderr,
                    "Error accepting connection. [%s]\n",
                    strerror(errno));
            continue;
        }


        err = sem_trywait(&(g_serv.client_count_sem));
        if (0 > err)
        {
            fprintf(stderr,
                    "Max connections reached. [%s]\n",
                    strerror(errno));
            notify_client_max_connections(client_fd);
            close(client_fd);
            client_fd = 0;
            continue;
        }

        printf("A client has connected.\n");

        char byte[2] = "0";
        err = send(client_fd, byte, 1, 0);

        // Set up client FD in global position and wake up a thread to grab it
        //
        pthread_mutex_lock(&(g_serv.new_connection_fd_lock));
        g_serv.new_connection_fd = client_fd;
        if (0 != g_serv.new_connection_fd)
        {
            pthread_cond_signal(&(g_serv.new_connection));
        }
        pthread_mutex_unlock(&(g_serv.new_connection_fd_lock));
    }

这个块是线程处理函数:

    void* thread_handler(void* args)
    {
        serv_t* p_serv = (serv_t*)args;
        bool    thread_client_connected;
        int     thread_client_fd;
        while(p_serv->b_running)
        {
            pthread_mutex_lock(&(p_serv->new_connection_fd_lock));
            while (0 == p_serv->new_connection_fd && p_serv->b_running)
            {
                pthread_cond_wait(&(p_serv->new_connection),
                                  &(p_serv->new_connection_fd_lock));
            }
            thread_client_fd = p_serv->new_connection_fd;
            p_serv->new_connection_fd = 0;
            pthread_mutex_unlock(&(p_serv->new_connection_fd_lock));
    
            // In the case of a pthread cond broadcast for exiting the server.
            //
            if (0 == thread_client_fd)
            {
                continue;
            }
            
            thread_client_connected = true;
            while (thread_client_connected)
            {
                thread_client_connected = handle_client(thread_client_fd);
            }
            close(thread_client_fd);
            thread_client_fd = 0;
            sem_post(&(p_serv->client_count_sem));
        }
        return NULL;
    } /* thread_handler */

仅供数据参考,这里是我的 serv_t 结构:

    typedef struct serv_t {
        bool            b_running;
        int             max_connections;
        int             serv_listener_fd;
        sem_t           client_count_sem;
        pthread_mutex_t new_connection_fd_lock;
        pthread_cond_t  new_connection;
        int             new_connection_fd;
        pthread_t*      p_thread_ids;
    } serv_t;

基本上,如果我运行 netcat 或客户端程序,我通过 bash 命令使用多个实例对它进行“后台”应用程序,其中一些实例会卡住。我让它将输出重定向到一个文件,但发生的事情是客户端/netcat 的特定实例在接受调用后刚刚卡住。

更具体地说,如果我用两个线程运行我的程序,程序的一个实例会卡住,并且后续副本不会卡住,即使在服务器上运行 6500 个实例也是如此。

如果我用 10 个线程运行它,多达 8 或 9 个实例会卡住,但这些线程仍然在服务器中正常运行。

编辑:

我指的客户端代码,从服务器开始,让客户端知道服务器已准备好接收数据:

    char buff[2] = { 0 };
    err = recv(client_socket_fd, buff, 1, 0);

    if ('0' != buff[0] && 1 != err)
    {
        fprintf(stderr,
                "Server handshake error. [%s]\n",
                strerror(errno));
        close(client_socket_fd);
        return EXIT_FAILURE;
    }

    if (NULL != p_infix_string)
    {
        if (MAX_BUFFER_SIZE < strlen(p_infix_string))
        {
            fprintf(stderr,
                    "Infix string is over 100 characters long.\n");
            return EXIT_FAILURE;
        }
        errno = 0;
        char* p_postfix = infix_to_postfix(p_infix_string);
        if (EINVAL == errno || NULL == p_postfix)
        {
            fprintf(stderr, "Error converting provided string.\n");
        }
        bool success = send_postfix(p_postfix, client_socket_fd);

        free(p_postfix);
        if (false == success)
        {
            fprintf(stderr,
                    "An error occured while sending the equation to the server.\n");
            close(client_socket_fd);
            return EXIT_FAILURE;
        }
    }

客户端在这里被卡在接收呼叫中:

bool send_postfix(char* p_postfix, int client_socket_fd)
{
    if (NULL == p_postfix)
    {
        fprintf(stderr, "No postfix string provided to send to server.\n");
        return false;
    }

    printf("Sending postfix to server\n");
    int err = send(client_socket_fd,
                   p_postfix,
                   strnlen(p_postfix, MAX_BUFFER_SIZE),
                   0);
    if(strnlen(p_postfix, MAX_BUFFER_SIZE) > err)
    {
        fprintf(stderr,
                "Unable to send message to server. [%s]\n",
                strerror(errno));
        return false;
    }

    char response[MAX_BUFFER_SIZE] = { 0 };
    printf("Waiting for receive\n");
    err = recv(client_socket_fd, &response, MAX_BUFFER_SIZE, 0);
    if (0 == err)
    {
        fprintf(stderr,
                "Connection to server lost. [%s]\n",
                strerror(errno));
        return false;
    }
    else if (0 > err)
    {
        fprintf(stderr,
                "Unable to receive message on socket. [%s]\n",
                strerror(errno));
        return false;
    }

    printf("Server responded with: \n%s\n", response);
    return true;
} /* send_postfix */

编辑:https ://github.com/TheStaplergun/Problem-Code 我将代码上传到这个 repo 并删除了对我使用的无关文件的需要并用占位符填充它们。您可以使用带有命令的服务器./postfix_server -p 8888 -n 2和另一个终端中的客户端问题重新创建此问题for i in {1..4}; do ./postfix_client -i 127.0.0.1 -p 8888 -e "3 + $i" &> $i.txt & done

由于客户端顶部的 setbuf,每个客户端的输出将被强制刷新。运行它,看看是否有任何程序挂起,如果没有再次运行该命令。只需键入 PS 并查看其中一个是否挂起,然后查看生成的文本文件。你会看到它卡在接听电话上。

如果您对服务器 ( CTRL + C) 进行签名,则被卡住的客户端将关闭并Connection reset by peer收到来自服务器的响应,因此服务器仍然将该文件描述符锁定在某个地方。

我相信比赛条件正在以某种方式发生,因为它只是随机发生的。

奇怪的是,每个服务器实例只发生一次。

如果我杀死那个挂起的实例并继续执行 10000 次,它永远不会再次挂起,直到服务器重置。

标签: cunixpthreads

解决方案


出于某种原因,即使在服务器内部使用以下代码流,一些客户端也会“掉入裂缝”并陷入困境。(他们永远不会被服务器处理,所以他们只是在接受连接后挂起)

可能还有其他问题,但我看到的第一个问题是主循环不能确保任何处理程序线程在尝试移交下一个连接之前实际拾取新连接。即使在接受新连接时 CV 上已经有处理程序线程阻塞,主服务器线程也有可能向 CV 发出信号,循环返回,接受另一个连接,重新获取互斥体,并覆盖新连接 FD在任何处理程序线程拿起前一个之前。如果您的线程数多于内核数,则这种可能性会增加。

请注意,这也会干扰您对可用处理程序的基于信号量的计数——您为每个接受的信号量减少信号量,但仅对成功处理的信号量再次增加它。

有多种方法可以让主服务器线程等待处理程序获取新连接。一组将涉及服务器等待 CV 本身,并在获取连接后依靠处理程序发出信号。另一种可能更简单的方法将涉及使用信号量来达到类似的效果。但我建议不要等待,而是为可用连接创建一个线程安全队列,这样服务器就不必等待。如果这对您有用的话,这甚至允许排队比目前可用的处理程序更多的连接。


推荐阅读