首页 > 解决方案 > pyzmq 同时发送到多个客户端

问题描述

我有一个 Raspberry Pi 客户端和 4 个 Raspberry Pi 服务器。我希望客户端同时向所有 4 个服务器发送字符串消息以捕获图像。现在我正在按顺序使用类似下面的东西。

socket.send(capture)
socket1.send(capture)
socket2.send(capture)
socket3.send(capture)

更改为发布/订阅模型会改善客户端接收消息的距离吗?我想让 4 个客户端在 5 毫秒或更短的时间内收到捕获消息。

标签: pythonnetworkingraspberry-pipyzmq

解决方案


欢迎来到零之禅:

虽然我们获得零保修,但我们可能会在适当的方向上采取一些步骤。如果是 ZeroMQ 的新手,请随时阅读此处的一些帖子,并至少阅读“ ZeroMQ原则在不到 5 秒内”,然后再深入了解更多细节:

客户代码概念模板:

   import zmq;                 print( zmq.zmq_version() ) # INF:

   aCtx = zmq.Context( 4 )                                # request 4-I/O-threads
   aPUB = aCtx.socket( zmq.PUB )                          # PUB-instance
   aPUB.setsockopt(    zmq.LINGER,   0 )                  # avoid deadlock on close
   aPUB.setsockopt(    zmq.SNDBUF, 3 * PayLoadSIZE )      # FullHD ~ 6 MB, 4K ~ ...
   aPUB.setsockopt(    zmq.SNDHWM, aNumOfPicsInQUEUE )    # 1, ~3? ~10?, !1000 ...
   aPUB.setsockopt(    zmq.IMMEDIATE, 1 )                 # ignore L1/L2-incomplete(s)
   aPUB.setsockopt(    zmq.CONFLATE, 1 )                  # do not re-send "old"
   aPUB.bind( <transport-class>:<port#> )                 # tcp:? udp-multicast?
   #-----------------------------------------------------------------------------[RTO]
   # may like to set aPayLOAD = gzip.compress( dill.dumps( capture ), compressionLEVEL )
   #                 yields reduced sizes of the serialised <capture> data
   #                 at costs of about ~30~60 [ms] on either side
   #                 which may lower the network traffic and .SNDBUF-sizing issues
   #----------------------------------------------------------------------
   while <any reason>:
         try:
              aPUB.send( aPayLOAD, zmq.NOBLOCK )
         except:
              # handle as per errno ...
         finally:
              pass
   #----------------------------------------------------------------------
   aPUB.close()
   aCtx.term()

服务器代码概念模板:

   import zmq;                 print( zmq.zmq_version() ) # INF:

   aCtx = zmq.Context()                                   # request 4-I/O-threads
   aSUB = aCtx.socket( zmq.SUB )                          # SUB-instance
   aSUB.setsockopt(    zmq.LINGER,   0 )                  # avoid deadlock on close
   aSUB.setsockopt(    zmq.RCVBUF, 3 * PayLoadSIZE )      # FullHD ~ 6 MB, 4K ~ ...
   aSUB.setsockopt(    zmq.RCVHWM, aNumOfPicsInQUEUE )    # 1, ~3? ~10?, !1000 ...
   aSUB.setsockopt(    zmq.IMMEDIATE, 1 )                 # ignore L1/L2-incomplete(s)
   aSUB.setsockopt(    zmq.CONFLATE, 1 )                  # do not re-recv "old"
   aSUB.setsockopt(    zmq.SUBSCRIBE, "" )                # do subscribe to whatever comes
   aSUB.connect( <transport-class>:<port#> )              # tcp:? udp-multicast?
   #-----------------------------------------------------------------------------[RTO]
   while <any reason>:
         try:
              if ( aSUB.poll( zmq.POLLIN, 0 ) == 0 ):
                 # nothing in the receiving Queue ready-to-.recv()
                 # sleep()
                 # do some system work etc
              else:
                 aPayLOAD = aSUB.recv( zmq.NOBLOCK )
                 #--------------------------------------------------------
                 # decompress / deserialise the original object
                 # capture = dill.loads( gzip.decompress( aPayLOAD ) )
                 #--------------------------------------------------------
                 # PROCESS THE DATA :
                 # ...
         except: 
              # handle as per errno ...
         finally:
              pass
   #----------------------------------------------------------------------
   aSUB.close()
   aCtx.term()

推荐阅读