首页 > 解决方案 > AWS Batch 上的进程间通信

问题描述

是否可以使用 ZeroMQ 在 AWS Batch 上的作业和本地父进程之间进行进程间通信?如果是这样,我可以参考一个简单的例子吗?我很难找到这方面的文件。

我希望clustermqR 包有一天会支持 AWS Batch 后端:https ://github.com/mschubert/clustermq/issues/208 。clustermq已经可以向 SLURM 等传统作业调度程序上的工作人员发送 R 函数调用,并使用 ZeroMQ 与工作人员进行通信。

标签: ramazon-web-serviceszeromqaws-batch

解决方案


“是否可以使用 ZeroMQ 在 AWS Batch 上的作业和本地父进程之间进行进程间通信?”

ZeroMQ 是一个可以做的工具箱,所以唯一的问题是云所有者规定的策略(加上修饰因素瓶颈,因为这些一直是新的与奇点相关的问题,如果有人想要的话,一个 SPoF。 .)

ZeroMQ 的分布式计算本身没有其他问题。


PoC 示例

我们可以在所有远程节点上实例化此代码并让它们运行并与您的本地主机通信,确定是否没有被云所有者的一组策略阻止,并且您是否可以让接收方在云引发的海啸中幸存下来;o)

设置了连接性,以便远程发送者代码“知道”本地主机的公共 IP 地址,或类似动机的 SSH 隧道元平面,通过 SSH 端口转发隧道调解互连,从而允许所有云远程发送者(如果没有被云所有者阻止......)实际上到达 localhost 私有 IP 地址,就好像它们在同一个私有 LAN 段中一样。

(为便于交叉移植而提供的 Python 代码)


本地主机接收器模型示例:

import zmq;         nIOthreads = 4 ### traffic dependent, given an expected ingress Tsunami
aCtx = zmq.Context( nIOthreads )
aSub = aCtx.socket( zmq.XSUB )
aSub.bind( "tcp://<_aPublicIpADDRESS_or_SSL_tunnelled_privateIpADDRESS_>:<_port#_>" )
aSub.setsockopt( zmq.LINGER, 0 )
aSub.setsockopt( zmq.CONFLATE, 0 )
aSub.setsockopt( zmq.SUBSCRIBE, "" )
...
while True:
      try:
           ret = aSub.recv( zmq.NOBLOCK )
           if ret:
                 print( "GOT: {0:}".format( repr( ret ) ) )

      except:
           print( "EXC: will terminate" )
           break

      finally:
           aSub.close()
           aCtx.term()

print( "FIN:  did terminate and process will sysexit." )

云远程发件人模型示例:

import socket
import zmq;         nIOthreads = 1 ### beware of the grooming-factor effects on traffic
aCtx = zmq.Context( nIOthreads )
aPub = aCtx.socket( zmq.XPUB )
aPub.connect( "tcp://<_aPublicIpADDRESS_or_SSL_tunnel_to_privateIpADDRESS_>:<_port#_>" )
aPub.setsockopt( zmq.LINGER, 0 )
aPub.setsockopt( zmq.CONFLATE, 0 )
...
aCloudHostMsgN = 1
aCloudHostNAME = socket.gethostname()
aCloudHostIP   = socket.gethostbyname( aCloudHostNAME )
aMsgMASK       = ( "msg[{0:_>9d}]"
                 + "_from_{0:_>20s}_ip_{1:_>20s}".format( aCloudHotsNAME,
                                                          aCloudHostIP
                                                          )
                   )
while True:
      try:
           aPub.send( aMsgMASK.format( aCloudHostMsgN ) )
           pass;                       aCloudHostMsgN++

      except:
           print( "EXC: will terminate" )
           break

      finally:
           aPub.close()
           aCtx.term()

print( "FIN:  did terminate and process will sysexit." )

推荐阅读