首页 > 解决方案 > Python ZeroMQ PUSH/PULL 逻辑,将高水位标记设置为低端 puller 而不会丢失任何消息

问题描述

我正在使用简单的一对一PUSH/PULL工作者/服务器 python 代码来发送和接收消息。

工作人员使用PUSH套接字向PULL服务器发送消息。服务器处理单元不如worker强大,因此当发送太多消息时,服务器的RAM开始增长,直到系统杀死所有东西。

我尝试如下设置接收器高水位线:

worker_sock_in = ZMQ_CTXT.socket(zmq.PULL)
worker_sock_in.setsockopt(zmq.LINGER, 1000))
worker_sock_in.setsockopt(zmq.RCVTIMEO, 1000)) # detects if the link is broken
worker_sock_in.setsockopt(zmq.RCVHWM, 1000)
worker_sock_in_port = worker_sock_in.bind_to_random_port(listen_addr, port_start, port_end)

下面的代码用于工作人员创建和发送消息:

sock_dest = ZMQ_CTXT.socket(zmq.PUSH)
sock_dest.setsockopt(zmq.LINGER, 1000))
sock_dest.setsockopt(zmq.SNDTIMEO, 1000)) # detects if the link is broken
sock_dest.setsockopt(zmq.SNDHWM, 0) # never block on sending msg
sock_dest.connect(sock_dest_address)
# sends a msg
self.sock_dest.send(msg, zmq.NOBLOCK)

它似乎解决了这个问题,但我的猜测是溢出消息只是被服务器丢弃了,这在我的情况下是不可接受的。

我已经使用此线程进行了开发,但我不确定是否理解答案的附加信息部分。

所以问题是,在 noblock 推/拉 zeromq 套接字上达到的 HWM 的真实行为是什么?有没有办法让推拉基础设施保证所有发送的消息都将被拉套接字接收,而不会膨胀其内存或阻塞发送者?

标签: pythonzeromqdistributed-systempyzmqlow-latency

解决方案


有没有办法让推拉基础设施保证所有发送的消息都将被拉套接字接收,而不会膨胀其内存阻塞发送者?

离开?就在这里:

内置的ZeroWarranty(涵盖作为原始文件的 1:1 位副本交付或根本不交付的消息)将需要通过应用程序级协议进行扩展(涵盖未交付的重新发送,直到确认)或将您的基础设施转移到使用但特定的有保证的交付协议,这将有助于满足这个高于标准的要求 - 使用norm://传输类扩展并移动范式,如果PUSH/PULL仍然不在 RTO 状态进入PUB/SUB, XPUB/XSUB可扩展的正式通信模式原型。

中提供了一个新的传输选项libzmq。" "norm_engine.hpp" norm_engine.cpp"文件为 ZeroMQ 提供了面向 NACK 的可靠多播 (NORM) 传输协议选项的实现。NORM 是 RFC 5740 和支持文档中指定的 IETF 开放标准协议。海军研究实验室 (NRL) 提供了托管在http://www.nrl.navy.mil/itd/ncs/products/norm的开源参考实现。

NORM 支持通过 IP 多播进行可靠的数据传输,但也支持单播(点对点)数据传输。NORM 在用户数据报协议 (UDP) 之上运行,并通过基于 NACK 的自动重复请求 (ARQ) 支持可靠性,该请求使用数据包擦除编码进行非常有效的组通信。NORM 还提供自动 TCP 友好的拥塞控制和支持端到端流量控制的机制。NRL NORM 实现还可以配置为提供基本的类似 UDP 的尽力而为传输服务(没有接收器反馈),并且可以通过在传输中添加一些应用程序可设置的主动前向纠错 (FEC) 数据包来增强这一点。即,默认情况下,NORM 仅发送“反应性”FEC 修复数据包响应 NACK,但也可以配置为主动发送添加的修复数据包以获得一定程度的可靠性,而无需来自接收器的任何反馈。除了 TCP 友好的拥塞控制之外,NORM 还可以配置为固定速率操作,并且 NRL 实施支持一些额外的自动拥塞控制选项,适用于容易误码的无线通信环境。虽然其可靠的 ARQ 操作主要基于 NACK(检测到数据包丢失时的否定确认),但它还支持来自接收器的可选肯定确认 (ACK),可用于传递确认和显式流量控制。

膨胀内存要求有两种前进方式:一种是显式控制-er,而.send()不是淹没.send()-er 侧Context()实例的资源 (RAM),即在受限资源限制内(主要是防止任何淹没/丢弃消息的发生)完全),另一个 - 具有足够的 RAM 和正确配置Context()的实例,以让所有数据流过。


在 noblock 推/拉 zeromq 套接字上达到的 HWM的真实行为是什么?

首先,让我们揭开这个神秘面纱。ZMQ_NOBLOCK-directive 指向本地,.send()-sideContext()立即将对 -method 的调用返回.send()给调用者,即不阻塞调用代码执行(消息有效负载被放置在本地 ZeroMQ Context()-instance 中以供进一步处理,而不管它的内部状态 - 经典的非阻塞代码设计)。

ZMQ_SNDHWM相反,指示-instance Context(),如何设置此套接字的阈值以及对于所述PUSH/PULL .send()-er 情况:

高水位线是对未完成消息的最大数量的硬限制 ØMQ 应在内存中为指定套接字正在与之通信的任何单个对等方排队。零值意味着没有限制。

如果已达到此限制,则套接字将进入异常状态,并且根据套接字类型,ØMQ 应采取适当的措施,例如阻止或丢弃已发送的消息。zmq_socket(3)有关针对每种套接字类型采取的确切操作的详细信息, 请参阅 中的各个套接字描述。

ØMQ 不保证套接字会接受尽可能多的ZMQ_SNDHWM消息,实际限制可能会降低多达 60-70%,具体取决于套接字上的消息流。

也使用 aZMQ_XPUB_NODROP可能有助于norm://-transport-class 用例。

另请注意,默认情况下,ZMQ_PUSH-sockets 的 API 确认:

当一个ZMQ_PUSH套接字由于所有下游节点都达到高水位线而进入静音状态,或者如果根本没有下游节点,则zmq_send(3)套接字上的任何操作都应阻塞,直到静音状态结束或至少一个下游节点变为可发送;消息不会被丢弃。


对于表现不佳的嫌疑人(PULL-side ),还要在 O/S 端测试适当大小的设置,使用.getsockopt( ZMQ_RCVBUF )-method 并根据需要调整适当、足够大的大小.setsockopt( ZMQ_RCVBUF )

ZMQ_RCVBUF选项应将套接字的底层内核接收缓冲区大小设置为以字节为单位的指定大小。值 -1 表示保持操作系统默认值不变。有关详细信息,请参阅您的操作系统文档以了解SO_RCVBUF套接字选项。


如果以上没有任何帮助,可以将一个自我诊断的元平面注入到 ZeroMQ 基础设施中,使用zmq_socket_monitor服务并获得对通常发生在应用程序代码之外的情况的完全控制(反映内部 API 状态和转换根据需要)。

决定权在你。


推荐阅读