首页 > 解决方案 > 在 ZMQ REQ/REP 模式中回复时间过长时应用程序崩溃

问题描述

我正在编写一个通过 ZeroMQREQ/REP请求-回复通信原型与桌面应用程序交互的插件。我目前可以收到一个请求,但如果回复的速度不够快,应用程序似乎会崩溃。

我在生成的线程上收到请求并将其放入队列中。此队列在另一个线程中处理,其中应用程序定期调用处理函数。

消息正在正确接收和处理,但在函数的下一次迭代之前无法发送响应,因为在此之前我无法从应用程序获取数据。

当此函数条件为在下一次迭代中发送响应时,应用程序将崩溃。但是,如果我在收到请求后立即发送虚假数据作为响应,则在第一次迭代中,应用程序不会崩溃。

构造套接字

    zmq::socket_t socket(m_context, ZMQ_REP);
    socket.bind("tcp://*:" + std::to_string(port));

在生成的线程中接收消息

void ZMQReceiverV2::receiveRequests() {
    nInfo(*m_logger) << "Preparing to receive requests";
    while (m_isReceiving) {
        zmq::message_t zmq_msg;
        bool ok = m_respSocket.recv(&zmq_msg, ZMQ_NOBLOCK);
        if (ok) {
            // msg_str will be a binary string
            std::string msg_str;
            msg_str.assign(static_cast<char *>(zmq_msg.data()), zmq_msg.size());
            nInfo(*m_logger) << "Received the message: " << msg_str;
            std::pair<std::string, std::string> pair("", msg_str);
            // adding to message queue
            m_mutex.lock();
            m_messages.push(pair);
            m_mutex.unlock();
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    nInfo(*m_logger) << "Done receiving requests";
}

单独线程上的处理函数


void ZMQReceiverV2::exportFrameAvailable()
    // checking messages
    // if the queue is not empty
    m_mutex.lock();
    if (!m_messages.empty()) {
        nInfo(*m_logger) << "Reading message in queue";
        smart_target::SMARTTargetCreateRequest id_msg;
        std::pair<std::string, std::string> pair = m_messages.front();
        std::string topic   = pair.first;
        std::string msg_str = pair.second;
        processMsg(msg_str);
        // removing just read message
        m_messages.pop(); 
        //m_respSocket.send(zmq::message_t()); wont crash if I reply here in this invocation
    }
    m_mutex.unlock();

    // sending back the ID that has just been made, for it to be mapped
    if (timeToSendReply()) {
        sendReply();  // will crash, if I wait for this to be exectued on next invocation
    }
}

我的研究表明,发送响应没有时间限制,所以这似乎是时间问题,很奇怪。

有什么我遗漏的东西可以让我在处理函数的第二次迭代中发送响应吗?

修订版 1:

我已经编辑了我的代码,因此响应套接字只存在于一个线程上。由于我需要从处理函数中获取信息来发送,我创建了另一个队列,它在修改后的函数中检查在它自己的线程上运行。

void ZMQReceiverV2::receiveRequests() {
    zmq::socket_t socket = setupBindSocket(ZMQ_REP, 5557, "responder");
    nInfo(*m_logger) << "Preparing to receive requests";
    while (m_isReceiving) {
        zmq::message_t zmq_msg;
        bool ok = socket.recv(&zmq_msg, ZMQ_NOBLOCK);
        if (ok) {
            // does not crash if I call send helper here
            // msg_str will be a binary string
            std::string msg_str;
            msg_str.assign(static_cast<char *>(zmq_msg.data()), zmq_msg.size());
            NLogger::nInfo(*m_logger) << "Received the message: " << msg_str;
            std::pair<std::string, std::string> pair("", msg_str);
            // adding to message queue
            m_mutex.lock();
            m_messages.push(pair);
            m_mutex.unlock();
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        if (!sendQueue.empty()) {
            sendEntityCreationMessage(socket, sendQueue.front());
            sendQueue.pop();
        }
    }
    nInfo(*m_logger) << "Done receiving requests";
    socket.close();
}

该函数sendEntityCreationMessage()是一个帮助函数,最终调用socket.send().

void ZMQReceiverV2::sendEntityCreationMessage(zmq::socket_t &socket, NUniqueID id) {
    socket.send(zmq::message_t());
}

此代码似乎遵循套接字的线程安全准则。有什么建议么?

标签: c++zeromq

解决方案


“有什么我想念的吗”


的,ZeroMQ 传播,被称为 Zen-of-Zero,因为曾经提倡永远不要尝试共享 Socket 实例,永远不要尝试阻止,永远不要期望世界按照一个愿望行事。

这就是说,避免从任何非本地线程接触相同的 Socket 实例,除了已经实例化并拥有套接字的线程。

最后但并非最不重要的一点是,REQ/REP-Scalable Formal Communication Pattern Archetype 很容易陷入僵局,因为必须遵守强制性的两步舞 - 其中必须保持调用.send()- .recv()- .send()- .recv()- .send()-方法的交替顺序...,否则有限状态自动机 (FSA)的主要串联将无可挽回地最终处于 dFSA 的相互自死锁状态。


如果有人计划在 ZeroMQ 上进行专业构建,最好的下一步是重新阅读精彩的 Pieter HINTJENS 的著作“Code Connected: Volume 1”。一本硬读,但绝对值得花时间、汗水、泪水和努力。


推荐阅读