python - 为什么 pyzmq 订阅者与 asyncio 的行为不同?
问题描述
我有一个 XPUB/XSUB 设备和许多模拟发布者在一个进程中运行。在一个单独的过程中,我想连接一个订阅者并将收到的消息打印到终端。下面我将展示一个简单函数的两个变体来做到这一点。我将这些函数包装为命令行实用程序。
我的问题是 asyncio 变体从不接收消息。
另一方面,非异步变体工作得很好。我已经测试了 ipc 和 tcp 传输的所有情况。发布过程在我的测试中永远不会改变,除非我重新启动它以更改传输。消息是短字符串,大约每秒发布一次,因此我们不考虑性能问题。
订户程序无限期地在线msg = await sock.receive_multipart()
。在 XPUB/XSUB 设备中,我有显示sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
消息转发的工具,与非异步变体连接时相同。
asyncio 变体(不工作,如上所述)
def subs(url, channel):
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
async def task():
while True:
msg = await sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
try:
asyncio.run(task())
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
常规阻塞变体(工作正常)
def subs(url, channel):
import zmq
ctx = zmq.Context.instance()
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
def task():
while True:
msg = sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
try:
task()
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
对于这个特定的工具,不需要使用 asyncio。但是,我在代码的其他地方也遇到了这个问题,异步recv 永远不会收到。所以我希望通过在这个简单的案例中清理它,我会理解一般出了什么问题。
我的版本是
import zmq
zmq.zmq_version() # '4.3.2'
zmq.__version__ # '19.0.2'
我在 MacOS 10.13.6 上。
我完全没有想法。互联网,请帮助!
解决方案
一个有效的异步变体是
def subs(url, channel):
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context.instance()
async def task():
sock = ctx.socket(zmq.SUB)
sock.connect(url)
sock.setsockopt(zmq.SUBSCRIBE, channel.encode())
try:
while True:
msg = await sock.recv_multipart()
print(' | '.join(m.decode() for m in msg))
finally:
sock.setsockopt(zmq.LINGER, 0)
sock.close()
asyncio.run(task())
我得出的结论是,当使用 asyncio zmq 时,必须使用在事件循环上运行的调用来创建套接字,从该循环中等待套接字。尽管原始形式没有对事件循环做任何花哨的事情,但似乎套接字有一个与asyncio.run
. 我不知道为什么,我没有打开 pyzmq 的问题,因为他们的文档显示了这个答案中的用法,没有评论。
编辑以回应评论:
asyncio.run
总是创建一个新的事件循环,因此可能为在传递给的协同例程之外实例化的套接字创建的循环asyncio.run
(如原始问题中的 asyncio 变体)显然是不同的。
推荐阅读
- c++ - 检查两个字符串是否相互排列
- java - 在杰克逊响应中有命名空间前缀
- python - Python 3 变量作用域(2 个测试用例):y4 是局部变量吗?分配前是否引用了 y4?
- c++ - 模板、继承、静态成员递增
- core-data - 核心数据和 cloudkit 同步 wwdc 2019 不适用于 beta 3
- php - 使用 PHP 和 MySQL 将图像文件上传并存储在数据库中并带有描述
- machine-learning - 如何查看每个单词的 tf-idf 分数
- java - 什么循环更有效?
- javascript - 如何在 package.js 中声明一个外部包
- r - 如何根据多个其他变量的值计算 R 中的新变量?