python - zero-mq如何获取队列容量和负载
问题描述
我在 ZeroMQ 中有一个发布者-订阅者架构。我使用 Python。
我需要能够判断某个队列何时将太满,并且最好能够对此采取一些措施。
我必须能够知道消息是否丢失。
但是,我无法找到有关此主题的相关文档。
希望得到一些帮助
谢谢!
解决方案
这是我所做的一个片段
def _flush_zmq_into_buffer(self):
# poll zmq for new messages. if any are found, put all possible items from zmq to self.buffer
# if many readers, need to use a read-write lock with writer priority
# http://code.activestate.com/recipes/577803-reader-writer-lock-with-priority-for-writers/
while True:
self._iteration += 1
self._flush_zmq_once()
sleep(0.005)
def _take_work_from_buffer(self):
while True:
try:
if self._buffer.qsize() > 0:
work_message = self._buffer.get(block=True)
# can't have any heavy operation here! this has to be as lean as possible!
else:
sleep(0.01)
continue
except queue.Empty as ex:
sleep(0.01)
continue
self._work_once(work_message)
def _flush_zmq_once(self):
self.__tick()
flushed_messages = 0
for i in range(self.max_flush_rate):
try:
message = self._parse_single_zmq_message()
self._buffer.put(message, block=True) # must block. can't lose messages.
except zmq.Again: # zmq empty
flushed_messages = i
break
self._log_load(flushed_messages)
self.__tock()
self.__print_flushed(flushed_messages)
这允许我将 zmq 缓冲区刷新到我自己的缓冲区中,比我解析消息的速度快得多,因此不会丢失任何消息,并且需要支付延迟。
这也让我知道每个刷新周期从 zmq 刷新了多少消息,从而对负载有所了解。
使用轮询事件的原因是,对于高速率的传入消息,事件系统将比轮询系统更昂贵。最后一句话未经检验,但我相信这是真的。
推荐阅读
- linear-regression - r.regression.series 不适用于大量栅格
- python - 如何使用assert语句来确认python3集合的类型
- r - 按带有正负条的值的数量标记条形图
- symfony - Symfony 3.4:如何强制取消身份验证
- reactjs - 在 Windows 上安装现有反应项目时出现 ELIFECYCLE 错误
- python - 如何使用 Azure Cloud Function (python) 中的用户管理标识向 Azure KeyVault 发出请求?
- excel - 使用 VBA 根据工作表变量对范围进行排序
- java - 为什么在 TransactionStatus 上 Spring Boot 中未完成事务?
- mysql - 从左连接中选择正确信息时遇到问题
- javascript - “订阅”类型的参数不可分配给类型的参数