python - pyzmq 同时发送到多个客户端
问题描述
我有一个 Raspberry Pi 客户端和 4 个 Raspberry Pi 服务器。我希望客户端同时向所有 4 个服务器发送字符串消息以捕获图像。现在我正在按顺序使用类似下面的东西。
socket.send(capture)
socket1.send(capture)
socket2.send(capture)
socket3.send(capture)
更改为发布/订阅模型会改善客户端接收消息的距离吗?我想让 4 个客户端在 5 毫秒或更短的时间内收到捕获消息。
解决方案
欢迎来到零之禅:
虽然我们获得零保修,但我们可能会在适当的方向上采取一些步骤。如果是 ZeroMQ 的新手,请随时阅读此处的一些帖子,并至少阅读“ ZeroMQ原则在不到 5 秒内”,然后再深入了解更多细节:
客户代码概念模板:
import zmq; print( zmq.zmq_version() ) # INF:
aCtx = zmq.Context( 4 ) # request 4-I/O-threads
aPUB = aCtx.socket( zmq.PUB ) # PUB-instance
aPUB.setsockopt( zmq.LINGER, 0 ) # avoid deadlock on close
aPUB.setsockopt( zmq.SNDBUF, 3 * PayLoadSIZE ) # FullHD ~ 6 MB, 4K ~ ...
aPUB.setsockopt( zmq.SNDHWM, aNumOfPicsInQUEUE ) # 1, ~3? ~10?, !1000 ...
aPUB.setsockopt( zmq.IMMEDIATE, 1 ) # ignore L1/L2-incomplete(s)
aPUB.setsockopt( zmq.CONFLATE, 1 ) # do not re-send "old"
aPUB.bind( <transport-class>:<port#> ) # tcp:? udp-multicast?
#-----------------------------------------------------------------------------[RTO]
# may like to set aPayLOAD = gzip.compress( dill.dumps( capture ), compressionLEVEL )
# yields reduced sizes of the serialised <capture> data
# at costs of about ~30~60 [ms] on either side
# which may lower the network traffic and .SNDBUF-sizing issues
#----------------------------------------------------------------------
while <any reason>:
try:
aPUB.send( aPayLOAD, zmq.NOBLOCK )
except:
# handle as per errno ...
finally:
pass
#----------------------------------------------------------------------
aPUB.close()
aCtx.term()
服务器代码概念模板:
import zmq; print( zmq.zmq_version() ) # INF:
aCtx = zmq.Context() # request 4-I/O-threads
aSUB = aCtx.socket( zmq.SUB ) # SUB-instance
aSUB.setsockopt( zmq.LINGER, 0 ) # avoid deadlock on close
aSUB.setsockopt( zmq.RCVBUF, 3 * PayLoadSIZE ) # FullHD ~ 6 MB, 4K ~ ...
aSUB.setsockopt( zmq.RCVHWM, aNumOfPicsInQUEUE ) # 1, ~3? ~10?, !1000 ...
aSUB.setsockopt( zmq.IMMEDIATE, 1 ) # ignore L1/L2-incomplete(s)
aSUB.setsockopt( zmq.CONFLATE, 1 ) # do not re-recv "old"
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # do subscribe to whatever comes
aSUB.connect( <transport-class>:<port#> ) # tcp:? udp-multicast?
#-----------------------------------------------------------------------------[RTO]
while <any reason>:
try:
if ( aSUB.poll( zmq.POLLIN, 0 ) == 0 ):
# nothing in the receiving Queue ready-to-.recv()
# sleep()
# do some system work etc
else:
aPayLOAD = aSUB.recv( zmq.NOBLOCK )
#--------------------------------------------------------
# decompress / deserialise the original object
# capture = dill.loads( gzip.decompress( aPayLOAD ) )
#--------------------------------------------------------
# PROCESS THE DATA :
# ...
except:
# handle as per errno ...
finally:
pass
#----------------------------------------------------------------------
aSUB.close()
aCtx.term()
推荐阅读
- android - 如何仅在 Android Studio 的主要版本中检查更新?
- java - jdk11 scala 2.12.8 兼容性
- c# - 使用 C# 将文本文件数据导入 PostgreSQL 数据库
- android - 循环遍历两个数组,同时检查两个数组是否为空 onClick of a button
- mysql - 此 Oracle 触发器的 MySQL 等效代码
- python - 在烧瓶应用程序中添加 ipdb 时,它会引发 RuntimeError: There is no current event loop in thread
- node.js - 如何在 Windows 中使用批处理文件一起安装 nodejs 和 node red
- uitableview - swift4中tableview内的可扩展下拉菜单
- wordpress - 如何在多个 wordpress 块中使用通用背景颜色?
- angular - 页面加载时从下拉列表中获取默认值