首页 > 解决方案 > zero-mq如何获取队列容量和负载

问题描述

我在 ZeroMQ 中有一个发布者-订阅者架构。我使用 Python。

我需要能够判断某个队列何时将太满,并且最好能够对此采取一些措施。

我必须能够知道消息是否丢失。

但是,我无法找到有关此主题的相关文档。

希望得到一些帮助

谢谢!

标签: pythonzeromqpyzmq

解决方案


这是我所做的一个片段

def _flush_zmq_into_buffer(self):
    # poll zmq for new messages. if any are found, put all possible items from zmq to self.buffer
    # if many readers, need to use a read-write lock with writer priority
    # http://code.activestate.com/recipes/577803-reader-writer-lock-with-priority-for-writers/
    while True:
        self._iteration += 1
        self._flush_zmq_once()
        sleep(0.005)

def _take_work_from_buffer(self):
    while True:
        try:
            if self._buffer.qsize() > 0:
                work_message = self._buffer.get(block=True)
                # can't have any heavy operation here! this has to be as lean as possible!
            else:
                sleep(0.01)
                continue
        except queue.Empty as ex:
            sleep(0.01)
            continue

        self._work_once(work_message)

def _flush_zmq_once(self):
    self.__tick()

    flushed_messages = 0
    for i in range(self.max_flush_rate):
        try:
            message = self._parse_single_zmq_message()
            self._buffer.put(message, block=True)  # must block. can't lose messages.
        except zmq.Again:  # zmq empty
            flushed_messages = i
            break

    self._log_load(flushed_messages)

    self.__tock()
    self.__print_flushed(flushed_messages)

这允许我将 zmq 缓冲区刷新到我自己的缓冲区中,比我解析消息的速度快得多,因此不会丢失任何消息,并且需要支付延迟。

这也让我知道每个刷新周期从 zmq 刷新了多少消息,从而对负载有所了解。

使用轮询事件的原因是,对于高速率的传入消息,事件系统将比轮询系统更昂贵。最后一句话未经检验,但我相信这是真的。


推荐阅读