首页 > 解决方案 > 即使设置了 ZMQ_LINGER,ZMQ_HEARTBEAT_TTL 也不会丢弃传出队列

问题描述

我有一个使用 aZMQ_ROUTERZMQ_DEALER客户端通信的服务器。我在客户端套接字上设置了ZMQ_HEARTBEAT_IVLandZMQ_HEARTBEAT_TTL选项,以使客户端和服务器相互乒乓。此外,由于该ZMQ_HEARTBEAT_TTL选项,如果服务器在一段时间内未收到来自客户端的任何 ping,则服务器将超时连接,根据 zmq 手册页:

ZMQ_HEARTBEAT_TTL 选项应设置远程对等端的 ZMTP 心跳超时。如果此选项大于 0,如果远程端在 TTL 周期内没有接收到更多流量,则将超时连接。如果 ZMQ_HEARTBEAT_IVL 未设置或为 0,则此选项无效。在内部,此值向下舍入到最接近的十进制,任何小于 100 的值都将无效。

因此,我期望服务器的行为是,当它在一段时间内没有收到来自客户端的任何流量时,它将关闭与该客户端的连接并在延迟时间到期后丢弃传出队列中的所有消息。我创建了一个玩具示例来检查我的假设是否正确,事实证明它不是。事件链如下:

所以我的问题是,这是否应该是如何使用 ZMQ 心跳机制的?如果不是,那么我想要实现的目标有什么解决方案吗?我认为我可以自己做心跳而不是使用内置的 ZMQ。但是,即使我这样做,ZMQ 似乎也没有提供一种方法来关闭 ZMQ_ROUTER 和 ZMQ_DEALER 之间的连接,尽管 ZMQ_ROUTER 的另一个版本 - ZMQ_STREAM提供了一种方法,通过发送一个身份帧后跟一个空帧来实现这一点。

玩具示例如下,任何帮助将不胜感激。

服务器端:

#include <zmq.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

int main(int argc, char **argv)
{
        void *context = zmq_ctx_new();
        void *router = zmq_socket(context, ZMQ_ROUTER);
        int router_mandatory = 1;
        zmq_setsockopt(router, ZMQ_ROUTER_MANDATORY, &router_mandatory, sizeof(router_mandatory));
        int hwm = 0;
        zmq_setsockopt(router, ZMQ_SNDHWM, &hwm, sizeof(hwm));
        int linger = 3000;
        zmq_setsockopt(router, ZMQ_LINGER, &linger, sizeof(linger));
        char bind_addr[1024];
        sprintf(bind_addr, "tcp://%s:%s", argv[1], argv[2]);
        if (zmq_bind(router, bind_addr) == -1) {
                perror("ERROR");
                exit(1);
        }
        
        // Receive client identity (only 1)
        zmq_msg_t identity;
        zmq_msg_init(&identity);
        zmq_msg_recv(&identity, router, 0);
        zmq_msg_t dump;
        zmq_msg_init(&dump);
        zmq_msg_recv(&dump, router, 0);
        printf("%s\n", (char *) zmq_msg_data(&dump)); // hello
        zmq_msg_close(&dump);        

        char buff[1 << 16];     
        for (int i = 0; i < 50000; ++i) {
                if (zmq_send(router, zmq_msg_data(&identity),
                                zmq_msg_size(&identity),
                                ZMQ_SNDMORE) == -1) {
                        perror("ERROR");
                        exit(1);
                }
                if (zmq_send(router, buff, 1 << 16, 0) == -1) {
                        perror("ERROR");
                        exit(1);
                }
        }
        printf("OK IM DONE SENDING\n");
        // All send commands have returned successfully
        // While the client is still receiving data, I unplug the intenet cable on the client machine
        // After a while, the server starts sending FIN signals
        printf("SLEEP before closing\n"); // At this point, the messages are not discarded (memory usage is high).
        getchar();
        zmq_close(router);
        zmq_ctx_destroy(context);                
}

客户端:

#include <zmq.h>
#include <stdlib.h>
#include <string.h>

int main(int argc, char **argv)
{
        void *context = zmq_ctx_new();
        void *dealer = zmq_socket(context, ZMQ_DEALER);
        int heartbeat_ivl = 3000;
        int heartbeat_timeout = 6000;
        zmq_setsockopt(dealer, ZMQ_HEARTBEAT_IVL, &heartbeat_ivl, sizeof(heartbeat_ivl));
        zmq_setsockopt(dealer, ZMQ_HEARTBEAT_TIMEOUT, &heartbeat_timeout, sizeof(heartbeat_timeout));
        zmq_setsockopt(dealer, ZMQ_HEARTBEAT_TTL, &heartbeat_timeout, sizeof(heartbeat_timeout));
        int hwm = 0;
        zmq_setsockopt(dealer, ZMQ_RCVHWM, &hwm, sizeof(hwm));
        char connect_addr[1024];
        sprintf(connect_addr, "tcp://%s:%s", argv[1], argv[2]);
        zmq_connect(dealer, connect_addr);
        zmq_send(dealer, "hello", 6, 0);
        size_t size = 0;
        int i = 0;
        while (size < (1ll << 16) * 50000) {
                zmq_msg_t msg;
                zmq_msg_init(&msg);
                if (zmq_msg_recv(&msg, dealer, 0) == -1) {
                        perror("ERROR");
                        exit(1);
                }
                size += zmq_msg_size(&msg);
                printf("i = %d, size = %ld, total = %ld\n", i, zmq_msg_size(&msg), size); // This causes the cliet to be slow
                // Somewhere in this loop I unplug the internet cable.
                // The client starts sending FIN signals as well as trying to reconnect. The recv command hangs forever.
                zmq_msg_close(&msg);
                ++i;                
        }
        zmq_close(dealer);
        zmq_ctx_destroy(context);        
}

PS:我知道将高水位标记设置为无限制是不好的做法,但是我认为即使高水位标记很低,问题也会一样,所以我们暂时忽略它。

标签: c++socketsserverclientzeromq

解决方案


推荐阅读