c++ - 即使设置了 ZMQ_LINGER,ZMQ_HEARTBEAT_TTL 也不会丢弃传出队列
问题描述
我有一个使用 aZMQ_ROUTER
与ZMQ_DEALER
客户端通信的服务器。我在客户端套接字上设置了ZMQ_HEARTBEAT_IVL
andZMQ_HEARTBEAT_TTL
选项,以使客户端和服务器相互乒乓。此外,由于该ZMQ_HEARTBEAT_TTL
选项,如果服务器在一段时间内未收到来自客户端的任何 ping,则服务器将超时连接,根据 zmq 手册页:
ZMQ_HEARTBEAT_TTL 选项应设置远程对等端的 ZMTP 心跳超时。如果此选项大于 0,如果远程端在 TTL 周期内没有接收到更多流量,则将超时连接。如果 ZMQ_HEARTBEAT_IVL 未设置或为 0,则此选项无效。在内部,此值向下舍入到最接近的十进制,任何小于 100 的值都将无效。
因此,我期望服务器的行为是,当它在一段时间内没有收到来自客户端的任何流量时,它将关闭与该客户端的连接并在延迟时间到期后丢弃传出队列中的所有消息。我创建了一个玩具示例来检查我的假设是否正确,事实证明它不是。事件链如下:
- 服务器向客户端发送一堆数据。
- 客户端接收和处理数据,速度很慢。
- 所有发送命令都成功返回。
- 当客户端仍在接收数据时,我拔掉了网线。
- 几秒钟后(由
ZMQ_HEARTBEAT_TTL
选项设置),服务器开始向客户端发送 FIN 信号,这些信号没有被回复。 - 即使过了一段时间,传出的消息也不会被丢弃(我检查内存消耗)。
zmq_close
只有当我调用路由器套接字时,它们才会被丢弃。
所以我的问题是,这是否应该是如何使用 ZMQ 心跳机制的?如果不是,那么我想要实现的目标有什么解决方案吗?我认为我可以自己做心跳而不是使用内置的 ZMQ。但是,即使我这样做,ZMQ 似乎也没有提供一种方法来关闭 ZMQ_ROUTER 和 ZMQ_DEALER 之间的连接,尽管 ZMQ_ROUTER 的另一个版本 - ZMQ_STREAM提供了一种方法,通过发送一个身份帧后跟一个空帧来实现这一点。
玩具示例如下,任何帮助将不胜感激。
服务器端:
#include <zmq.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
int main(int argc, char **argv)
{
void *context = zmq_ctx_new();
void *router = zmq_socket(context, ZMQ_ROUTER);
int router_mandatory = 1;
zmq_setsockopt(router, ZMQ_ROUTER_MANDATORY, &router_mandatory, sizeof(router_mandatory));
int hwm = 0;
zmq_setsockopt(router, ZMQ_SNDHWM, &hwm, sizeof(hwm));
int linger = 3000;
zmq_setsockopt(router, ZMQ_LINGER, &linger, sizeof(linger));
char bind_addr[1024];
sprintf(bind_addr, "tcp://%s:%s", argv[1], argv[2]);
if (zmq_bind(router, bind_addr) == -1) {
perror("ERROR");
exit(1);
}
// Receive client identity (only 1)
zmq_msg_t identity;
zmq_msg_init(&identity);
zmq_msg_recv(&identity, router, 0);
zmq_msg_t dump;
zmq_msg_init(&dump);
zmq_msg_recv(&dump, router, 0);
printf("%s\n", (char *) zmq_msg_data(&dump)); // hello
zmq_msg_close(&dump);
char buff[1 << 16];
for (int i = 0; i < 50000; ++i) {
if (zmq_send(router, zmq_msg_data(&identity),
zmq_msg_size(&identity),
ZMQ_SNDMORE) == -1) {
perror("ERROR");
exit(1);
}
if (zmq_send(router, buff, 1 << 16, 0) == -1) {
perror("ERROR");
exit(1);
}
}
printf("OK IM DONE SENDING\n");
// All send commands have returned successfully
// While the client is still receiving data, I unplug the intenet cable on the client machine
// After a while, the server starts sending FIN signals
printf("SLEEP before closing\n"); // At this point, the messages are not discarded (memory usage is high).
getchar();
zmq_close(router);
zmq_ctx_destroy(context);
}
客户端:
#include <zmq.h>
#include <stdlib.h>
#include <string.h>
int main(int argc, char **argv)
{
void *context = zmq_ctx_new();
void *dealer = zmq_socket(context, ZMQ_DEALER);
int heartbeat_ivl = 3000;
int heartbeat_timeout = 6000;
zmq_setsockopt(dealer, ZMQ_HEARTBEAT_IVL, &heartbeat_ivl, sizeof(heartbeat_ivl));
zmq_setsockopt(dealer, ZMQ_HEARTBEAT_TIMEOUT, &heartbeat_timeout, sizeof(heartbeat_timeout));
zmq_setsockopt(dealer, ZMQ_HEARTBEAT_TTL, &heartbeat_timeout, sizeof(heartbeat_timeout));
int hwm = 0;
zmq_setsockopt(dealer, ZMQ_RCVHWM, &hwm, sizeof(hwm));
char connect_addr[1024];
sprintf(connect_addr, "tcp://%s:%s", argv[1], argv[2]);
zmq_connect(dealer, connect_addr);
zmq_send(dealer, "hello", 6, 0);
size_t size = 0;
int i = 0;
while (size < (1ll << 16) * 50000) {
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, dealer, 0) == -1) {
perror("ERROR");
exit(1);
}
size += zmq_msg_size(&msg);
printf("i = %d, size = %ld, total = %ld\n", i, zmq_msg_size(&msg), size); // This causes the cliet to be slow
// Somewhere in this loop I unplug the internet cable.
// The client starts sending FIN signals as well as trying to reconnect. The recv command hangs forever.
zmq_msg_close(&msg);
++i;
}
zmq_close(dealer);
zmq_ctx_destroy(context);
}
PS:我知道将高水位标记设置为无限制是不好的做法,但是我认为即使高水位标记很低,问题也会一样,所以我们暂时忽略它。
解决方案
推荐阅读
- java - 如何在 jar 中包含 Groovy 扩展方法?
- postgresql - 默认角色无法登录 Postgres 10
- python - 计算蛋白质序列的所有可能的 RNA 密码子组合
- javascript - 尝试将 Ionic 文件作为 blob 从 android 设备加载时出现 NOT_FOUND_ERR
- javascript - href 使用 .attr()、.prop()、.each() 等返回未定义但能够返回值
- ios - 如何从 Firestore 中检索 PKDrawing
- python - Spotipy 高请求率
- python - Pandas 和 for 循环:矢量化操作和加速 python 代码
- php - 你能在 JSON 中嵌入“命令”吗?
- vb.net - 从 MDI Parent 调用的 MDI Child 表单立即停用