首页 > 解决方案 > AMQP-CPP:TCP 处理程序中的管道损坏错误


不幸的是,在我的项目中,我总是在事件处理程序的 onError 函数中出现错误消息“ Broken pipe ”。不幸的是,我从未进入 onConnected 状态。事件处理程序中的监视器功能被调用两次,带有标志 AMQP::可读。之后,它在没有设置标志的情况下被调用,那是我的管道被破坏的时候。



int Communicator_RabbitMQ::Open(string device)

    AMQP::Address address(AMQP::Address("amqp://test:test@localhost/"));

            // make a connection
    m_connection = std::make_shared< AMQP::TcpConnection> (&oCommunicator_RabbitMQ_Handler, address);

    // we need a channel too
    m_channel = std::make_shared <AMQP::TcpChannel> (m_connection.get());

    m_channel->declareExchange("my-exchange", AMQP::fanout);
    m_channel->bindQueue("my-exchange", "my-queue", "my-routing-key");

    m_channel->declareExchange("cyos_tx_exchange", AMQP::direct);
    m_channel->bindQueue("cyos_tx_exchange", "cyos_queue", "");

    return true;


string Communicator_RabbitMQ::Read()

    int result = 0;
    int maxfd = 1;

    struct timeval tv
        1, 0

    string returnValue; //Rückgabe der Methode  
    string message;     // Nachricht aus RabbitMQ


        FD_SET(oCommunicator_RabbitMQ_Handler.m_fd, &oCommunicator_RabbitMQ_Handler.m_set);

        if (oCommunicator_RabbitMQ_Handler.m_fd != -1)
            maxfd = oCommunicator_RabbitMQ_Handler.m_fd + 1;

        result = select(FD_SETSIZE, &oCommunicator_RabbitMQ_Handler.m_set, NULL, NULL, &tv);

        if ((result == -1) && errno == EINTR)
            TRACE(L"Error in socket");
        else if (result > 0)
            if (oCommunicator_RabbitMQ_Handler.m_flags & AMQP::readable)
                TRACE(L"Got something");

            if (FD_ISSET(oCommunicator_RabbitMQ_Handler.m_fd, &oCommunicator_RabbitMQ_Handler.m_set))
                m_connection->process(oCommunicator_RabbitMQ_Handler.m_fd, oCommunicator_RabbitMQ_Handler.m_flags);


    catch (exception e)
        cout << e.what();

    return "";


这是 TCP 事件处理程序:

#pragma once

class Communicator_RabbitMQ_Handler : public AMQP::TcpHandler

    * Method that is called when the connection succeeded
    * @param socket Pointer to the socket
    virtual void onConnected(AMQP::TcpConnection* connection)
        std::cout << "connected" << std::endl;

         *  When the connection ends up in an error state this method is called.
         *  This happens when data comes in that does not match the AMQP protocol
         *  After this method is called, the connection no longer is in a valid
         *  state and can be used. In normal circumstances this method is not called.
         *  @param  connection      The connection that entered the error state
         *  @param  message         Error message
    virtual void onError(AMQP::TcpConnection* connection, const char* message)
        // report error
        std::cout << "AMQP TCPConnection error: " << message << std::endl;

         *  Method that is called when the connection was closed.
         *  @param  connection      The connection that was closed and that is now unusable
    virtual void onClosed(AMQP::TcpConnection* connection)
        std::cout << "closed" << std::endl;

         *  Method that is called by AMQP-CPP to register a filedescriptor for readability or writability
         *  @param  connection  The TCP connection object that is reporting
         *  @param  fd          The filedescriptor to be monitored
         *  @param  flags       Should the object be monitored for readability or writability?
    virtual void monitor(AMQP::TcpConnection* connection, int fd, int flags)
        //TRACE(L"Communicator_RabbitMQ_Handler, monitor called, %d, %d, %x", fd, flags, &m_set);
        // we did not yet have this watcher - but that is ok if no filedescriptor was registered
        if (flags == 0) 

        if (flags & AMQP::readable)
            FD_SET(fd, &m_set);
            m_fd = fd;
            m_flags = flags;

    Communicator_RabbitMQ_Handler() = default;

    int m_fd = -1;
    int m_flags = 0;
    fd_set m_set;


RabbitMQ 日志条目:

2018-07-02 07:04:50.272 [info] <0.9653.0> accepting AMQP connection <0.9653.0> ([::1]:39602 -> [::1]:5672)
2018-07-02 07:04:50.273 [warning] <0.9653.0> closing AMQP connection <0.9653.0> ([::1]:39602 -> [::1]:5672):

标签: c++eventstcprabbitmqamqp


我最终通过在rabbitmq.config文件中将 hanshake 超时增加到 20 秒来解决这个问题。我刚刚在该文件中添加了以下内容:

handshake_timeout = 20000

该值以毫秒为单位,默认值为 10 秒,这对于我的解决方案似乎不够。
