首页 > 解决方案 > 在官方 ZeroMQ 多线程示例的修改版本中崩溃

问题描述

我是 zmq 和 cppzmq 的新手。在尝试运行官方指南中的多线程示例时:http://zguide.zeromq.org/cpp: mtserver

我的设置

我遇到了一些问题。

问题 1

在指南中运行源代码时,它会永远挂起,而不会显示任何标准输出。

这是直接从指南复制的代码。

/*
    Multithreaded Hello World server in C
*/

#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <zmq.hpp>

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        zmq::message_t request;
        socket.recv (&request);
        std::cout << "Received request: [" << (char*) request.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        socket.send (reply);
    }
    return (NULL);
}

int main ()
{
    //  Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);
    clients.bind ("tcp://*:5555");
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    //  Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr != 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, (void *) &context);
    }
    //  Connect work threads to client threads via a queue
    zmq::proxy (static_cast<void*>(clients),
                static_cast<void*>(workers),
                nullptr);
    return 0;
}

在我在worker的while循环中放置一个断点后,它很快就崩溃了。

问题 2

注意到编译器提示我替换不推荐使用的 API 调用,我修改了上面的示例代码以使警告消失。

/*
 Multithreaded Hello World server in C
 */

#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <cstdio>
#include <zmq.hpp>

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        std::array<char, 1024> buf{'\0'};
        zmq::mutable_buffer request(buf.data(), buf.size());
        socket.recv(request, zmq::recv_flags::dontwait);
        std::cout << "Received request: [" << (char*) request.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        try {
            socket.send (reply, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t& e) {
            printf("ERROR: %X\n", e.num());
        }
    }
    return (NULL);
}

int main ()
{
    //  Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);
    clients.bind ("tcp://*:5555");  // who i talk to.
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    //  Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr != 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, (void *) &context);
    }
    //  Connect work threads to client threads via a queue
    zmq::proxy (clients, workers);
    return 0;
}

我并没有假装对原始损坏示例进行了直译,但我努力使事情编译和运行而没有明显的内存错误。

此代码不断给我9523DFB来自 try-catch 块的错误号(<code>156384763in Hex)。我在官方文档中找不到错误号的定义,但从这个问题 中得到它是本机 ZeroMQ 错误 EFSM:

The zmq_send() operation cannot be performed on this socket at the moment due to the socket not being in the appropriate state. This error may occur with socket types that switch between several states, such as ZMQ_REP.

如果有人能指出我做错了什么,我将不胜感激。

更新

我尝试根据@user3666197 的建议进行投票。但程序仍然挂起。插入任何断点都会有效地使程序崩溃,使其难以调试。

这是新的工人代码

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    zmq::pollitem_t items[1] = { { socket, 0, ZMQ_POLLIN, 0 } };

    while (true) {
        if(zmq::poll(items, 1, -1) < 1) {
            printf("Terminating worker\n");
            break;
        }

        //  Wait for next request from client
        std::array<char, 1024> buf{'\0'};
        socket.recv(zmq::buffer(buf), zmq::recv_flags::none);
        std::cout << "Received request: [" << (char*) buf.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        try {
            socket.send (reply, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t& e) {
            printf("ERROR: %s\n", e.what());
        }
    }
    return (NULL);
}

标签: c++multithreadingzeromq

解决方案


欢迎来到零之禅的领域

嫌疑人#1:由于进入分布式有限状态自动机的错误定向状态,代码直接跳转到无法解析的活锁:

虽然我从那时起就提倡更喜欢非阻塞 . recv()-s,上面的代码使用这一步简单地自杀:

socket.recv( request, zmq::recv_flags::dontwait ); // socket being == ZMQ_REP

杀死任何其他未来生活的所有机会,但是 当且仅当先前的-ed 传递了真实的消息时 ,进入-able 状态的错误是可能 的。The zmq_send() operation cannot be performed on this socket at the moment due to the socket not being in the appropriate state.

.send().recv()


最好的下一步:

查看代码,可以使用.recv()before go to的阻塞形式,.send()或者更好的是使用 { blocking | non-blocking }-form of.poll( { 0 | timeout }, ZMQ_POLLIN )在进入尝试.recv()并继续做其他事情之前,如果还没有任何东西可以接收(以避免自我自杀将 dFSA 扔进可解决的碰撞中,stdout/stderr用第二间隔的流量淹没你printf(" ERROR: %X\n", e.num() );) _


错误处理:

更好地利用const char *zmq_strerror ( int errnum );被喂养int zmq_errno (void);


问题1:

与问题 2 根本原因中的自杀标志相反::dontwait,问题 2 根本原因是,.recv()这里的第一个阻塞形式将所有工作线程移动到不确定的长、可能无限的等待状态,因为.recv()-阻塞继续进行任何进一步的步骤,直到真正的消息到达(从 MCVE 看来,它永远不会到达),因此您的线程池保持在池范围内的阻塞等待状态,并且什么都不会直到任何消息到达。


更新REQ/REP工作原理:

Scalable Communication Pattern Archetype 就像一对分布式的REQ/REP人一样工作 - 一个人,让我们称她为 Mary,询问( Mary .send()-s the REQ),而另一个人,说 Bob theREP在一个可能无限长的阻塞中倾听.recv()(或采取应有的注意,用于.poll()有条不紊地定期检查玛丽是否询问过某事并继续做他自己的爱好或园艺)一旦鲍勃的一端收到消息,鲍勃就可以去.send()给玛丽回复(不是之前,因为他什么都不知道玛丽何时会(或不会)问什么以及在更近或更远的将来问什么))并且玛丽不会在REQ.send()任何时候尽快向鲍勃提出她的下一个问题,但在鲍勃(REP.send())回复并且玛丽收到鲍勃的消息之后(REQ.recv()) - 这比真实生活在一个屋檐下的真实人物中表现出来的更公平和对称:o)

编码?

该代码不是可重现的 MCVE。main()创建了五个 Bobs(挂起等待 Mary 打来的电话,在运输舱的某个地方)inproc://,但没有 Mary 打过电话,还是她?没有任何玛丽试图这样做的明显迹象,她(他们的,可能是一个(甚至是一个动态的)N:M herd-of-Mary(s):herd-of-5-Bobs 关系的社区)尝试越少( s) 处理来自 5-Bobs 之一的 REP-ly(s)。

坚持不懈,ZeroMQ 花了我一些时间摸不着头脑,但在我认真学习零之禅之后的几年里,仍然是在天堂花园中一次有益的永恒漫步。没有 localhost 串行代码 IDE 将能够“调试”分布式系统(除非分布式检查器基础设施到位,分布式系统监视器/跟踪器/调试器的适当架构是分布式消息传递/信号层的另一层在调试的分布式消息传递/信令系统之上——所以不要指望它来自一个微不足道的 localhost 串行代码 IDE。

如果仍有疑问,请隔离潜在的麻烦制造者 - 替换inproc://tcp://,如果玩具无法使用tcp://(可以通过有线方式追踪消息),则不会使用inproc://记忆区技巧。


推荐阅读