caching - zeromq: ZMQ_CONFLATE==1 不会阻止队列保存旧消息
问题描述
使用 ZeroMQ 和 CPPZMQ 4.3.2,我想删除所有套接字的旧消息,包括
- 一对
- 发布/订阅
- 请求/代表
所以我m_socks[channel].setsockopt(ZMQ_CONFLATE, 1)
在绑定/连接之前在我的所有套接字上使用。
测试
但是,当我进行以下测试时,似乎每次重新连接时旧消息仍会被清除。在本次测试中,
- 我使用一个线程继续将生成的正弦波发送到接收器线程
- 每 10 秒我将正弦波的频率加倍
- 然后在 10 秒后我停止该过程
下面是发件人的伪代码
// on sender end
auto thenSec = high_resolution_clock::now();
while(m_isRunning) {
// generate sinewave, double the frequency every 10s or so
auto nowSec = high_resolution_clock::now();
if (duration_cast<seconds>(nowSec - thenSec).count() > 10) {
m_sine.SetFreq(m_sine.GetFreq()*2);
thenSec = nowSec;
}
m_sine.Generate(audio);
// send to rendering thread
m_messenger.send("inproc://sound-ear.pair",
(const void*)(audio),
audio_size,
zmq::send_flags::dontwait
);
}
请注意,我已经使用 DONTWAIT 来减轻阻塞。
在接收方,我有一个zmq::poller_event
处理程序,它只接收事件轮询的最后一条消息。
在停止序列中,我将正弦波频率重置为最低值,例如 440Hz。
预期的
预期的行为将是:
- 如果我在频率翻倍的 10 秒后同时停止发送方和接收方,
- 我重新启动两者,
- 然后我应该看到正弦波重置为 440Hz。
观察到的
但是观察到的行为是重新开始通信后接收到的正弦波仍然是双倍频率,即880Hz。
问题
我做错了还是应该在这种情况下使用某种 killswitch 强制删除所有消息?
解决方案
好的,我想我自己解决了。有点儿。
实际解决方案
我终于意识到我想要的行为是在我停止渲染时刷新所有消息。根据官方文档(如何刷新 ZeroMQ 套接字队列中的所有消息?),这只能通过
- 将发送方和接收方
ZMQ_LINGER
选项的套接字设置为 0,这意味着关闭这些套接字时不保留任何内容; - 关闭发送端和接收端的套接字,这还涉及引导轮询器和对套接字的所有引用。
如果我要在停止序列之后立即重新开始渲染我的数据,这似乎是很多不必要的工作。但我发现没有其他方法可以干净地解决这个问题。
最初的努力
在我看来,这ZMQ_CONFLATE
对PAIR
. 我真的必须使用 ZMQ_SNDHWM
and来调整发送方和接收方端的高水位线ZMQ_RCVHWM
。
但是,我说“有点解决了”,因为最终调整 HWM 并不是实时应用程序的最佳解决方案,
将 ZMQ_SNDHWM / ZMQ_RCVHWM 设置为最小值“1”,就实时而言,我们仍然有相当大的延迟。
此外,消费者线程可能会陷入欠载情况,即具有最低 HWM 的可感知抖动。
如果我没有做错任何事情,我想最佳解决方案仍然是针对我的目标场景的共享内存。这很可悲,因为我真的很喜欢 ZMQ 的多播消息传递模式的简单性,并且讨厌处理到处都是线程锁定的问题。