python - 带有 aiozmq 流的简单 PUB/SUB
问题描述
我正在尝试使用 aiozmq 流制作一个简单的 PUB/SUB(由于某些原因我不想使用 aiozmq rpc)但没有成功:
发布.py
# coding: utf-8
import asyncio
import time
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.PUB,
bind='tcp://127.0.0.1:5556',
)
while True:
await asyncio.sleep(1)
msg = [str(time.time()).encode()]
print('write ', msg)
stream.write(msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
子.py
# coding: utf-8
import asyncio
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.SUB,
connect='tcp://127.0.0.1:5556',
)
while True:
print('wait ...')
msg = await stream.read()
print('received ', msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
执行 pub.py 时:
python pub.py
write [b'1534927086.914483']
write [b'1534927087.9154818']
write [b'1534927088.9164672']
然后执行 sub.py:
python sub.py
wait ...
我错过了什么?
解决方案
只需错过sub.py
. 有一个工作 sub.py:
# coding: utf-8
import asyncio
import aiozmq
import zmq
async def do():
stream = await aiozmq.stream.create_zmq_stream(
zmq_type=zmq.SUB,
connect='tcp://127.0.0.1:5556',
)
stream.transport.subscribe(b'')
while True:
print('wait ...')
msg = await stream.read()
print('received ', msg)
loop = asyncio.get_event_loop()
loop.run_until_complete(do())
谁生产:
python sub.py
wait ...
received [b'1534927504.0462704']
wait ...
received [b'1534927505.0478334']
推荐阅读
- c# - Unity - 异步/等待太慢
- mysql - 在不关闭防火墙的情况下修复 ECONNREFUSED?
- serilog - 无法使用 Serilog 将最小日志记录级别设置为 HttpClients 的警告
- arrays - 打字稿:从联合类型获取合并/组合类型
- c# - 如何在初始化时向 GameObjects 添加组件?
- c# - C# 9 记录有来自 ToString() 的 {propertyName=value} 输出,如何从中获取,返回记录类型?
- r - R 中的激光雷达数据分析 - 如何在 las 中引用 XY 坐标?
- c++ - 我可以稍后推导出模板参数吗?
- reactjs - 在反应钩子页面中迭代时,我们如何在 nomineeFirstName 和 nomineeLastName 之间添加一个空格
- python - 大型 csv 文件:MemoryError: Unable to allocate 3.25 GiB for an array with shape (7, 62388743) and data type object