首页 > 解决方案 > 订阅者接收消息缓慢

问题描述

我有一个 pyzmq Publisher,它每秒发送大约 1000 条消息。我正在尝试在 asyncio event_loop 中启动大约 10 个订阅者。

它可以工作,但比唯一一个订阅者的速度慢大约 2.5 倍。

代码可能有什么问题?

import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop


class Client:
    REQUEST_TIMEOUT = 35000
    SERVER_ENDPOINT = "tcp://localhost:6666"

    def __init__(self, id_):
        self.id = id_

    def get_task(self):
        return asyncio.create_task(self.client_coroutine())

    async def client_coroutine(self):
        context = Context.instance()

        socket = context.socket(zmq.SUB)
        socket.connect(self.SERVER_ENDPOINT)
        socket.setsockopt(zmq.SUBSCRIBE, b'4')
        poller = Poller()
        poller.register(socket, zmq.POLLIN)

        while True:
            event = dict(await poller.poll(self.REQUEST_TIMEOUT))
            if event.get(socket) == zmq.POLLIN:
                reply = await socket.recv_multipart(flags=NOBLOCK)
                if not reply:
                    break
                else:
                    print(eval(json.loads(reply[1].decode('utf-8'))))
            else:
                print("No response from server, retrying...")
                socket.setsockopt(zmq.LINGER, 0)
                socket.close()
                poller.unregister(socket)

async def tasks():
    _tasks = [Client(id_).get_task() for id_ in range(10)]
    done, pending = await asyncio.wait(_tasks, return_when=asyncio.FIRST_EXCEPTION)


loop = asyncio.get_event_loop()
loop.run_until_complete(tasks())

标签: pythonpython-asynciopython-multithreadingpyzmq

解决方案


代码可能有什么问题?

鉴于代码使用的是相同的localhost(从使用地址可以看出),第一个嫌疑人是,有 10 倍的工作要处理,这样的工作量总是会给localhostO/S 和 CPU 带来压力,不是吗?

接下来是运输等级的选择。鉴于所有SUB-s 都与 .s 位于同一位置localhostPUB所有基于 L3 堆栈的 TCP/IP 协议工作都被浪费了。tcp://为了比较相对成本(使用传输类进行此硬件单一消息传递的附加效果),使用传输类进行测试inproc://,其中没有与协议相关的 TCP/IP 堆栈附加将进行处理。

最后但同样重要的是,我的代码永远不会混合不同的事件循环(从 v2.11 开始使用 ZeroMQ,所以有人可能会认为我在避免依赖async最近 py3.6+ 中可用的 -decorated 功能方面有点过时)

我的代码将使用显式的、非阻塞的、零等待的测试来检测消息的存在,aSocketINSTANCEaSocketINSTANCE.poll( zmq.POLLIN, 0 )不是使用任何“外部”添加的装饰,它可能会报告相同的结果,但是通过一些额外的(昂贵的和外部的)我的代码控制域)事件处理。所有实时、低延迟的用例都力求尽可能减少延迟/开销,因此在我的项目中使用显式控制总是会胜过任何“现代”的语法糖化技巧。

无论如何,享受零之禅


推荐阅读