python - 订阅者接收消息缓慢
问题描述
我有一个 pyzmq Publisher,它每秒发送大约 1000 条消息。我正在尝试在 asyncio event_loop 中启动大约 10 个订阅者。
它可以工作,但比唯一一个订阅者的速度慢大约 2.5 倍。
代码可能有什么问题?
import asyncio
import zmq
import json
from zmq.backend.cython.constants import NOBLOCK
from zmq.asyncio import Context, Poller
from loop_ import Loop
class Client:
REQUEST_TIMEOUT = 35000
SERVER_ENDPOINT = "tcp://localhost:6666"
def __init__(self, id_):
self.id = id_
def get_task(self):
return asyncio.create_task(self.client_coroutine())
async def client_coroutine(self):
context = Context.instance()
socket = context.socket(zmq.SUB)
socket.connect(self.SERVER_ENDPOINT)
socket.setsockopt(zmq.SUBSCRIBE, b'4')
poller = Poller()
poller.register(socket, zmq.POLLIN)
while True:
event = dict(await poller.poll(self.REQUEST_TIMEOUT))
if event.get(socket) == zmq.POLLIN:
reply = await socket.recv_multipart(flags=NOBLOCK)
if not reply:
break
else:
print(eval(json.loads(reply[1].decode('utf-8'))))
else:
print("No response from server, retrying...")
socket.setsockopt(zmq.LINGER, 0)
socket.close()
poller.unregister(socket)
async def tasks():
_tasks = [Client(id_).get_task() for id_ in range(10)]
done, pending = await asyncio.wait(_tasks, return_when=asyncio.FIRST_EXCEPTION)
loop = asyncio.get_event_loop()
loop.run_until_complete(tasks())
解决方案
问:代码可能有什么问题?
鉴于代码使用的是相同的localhost
(从使用地址可以看出),第一个嫌疑人是,有 10 倍的工作要处理,这样的工作量总是会给localhost
O/S 和 CPU 带来压力,不是吗?
接下来是运输等级的选择。鉴于所有SUB
-s 都与 .s 位于同一位置localhost
,PUB
所有基于 L3 堆栈的 TCP/IP 协议工作都被浪费了。tcp://
为了比较相对成本(使用传输类进行此硬件单一消息传递的附加效果),使用传输类进行测试inproc://
,其中没有与协议相关的 TCP/IP 堆栈附加将进行处理。
最后但同样重要的是,我的代码永远不会混合不同的事件循环(从 v2.11 开始使用 ZeroMQ,所以有人可能会认为我在避免依赖async
最近 py3.6+ 中可用的 -decorated 功能方面有点过时)
我的代码将使用显式的、非阻塞的、零等待的测试来检测消息的存在,aSocketINSTANCE
而aSocketINSTANCE.poll( zmq.POLLIN, 0 )
不是使用任何“外部”添加的装饰,它可能会报告相同的结果,但是通过一些额外的(昂贵的和外部的)我的代码控制域)事件处理。所有实时、低延迟的用例都力求尽可能减少延迟/开销,因此在我的项目中使用显式控制总是会胜过任何“现代”的语法糖化技巧。
无论如何,享受零之禅