首页 > 解决方案 > .map 后跟 .result() 和/或 client.gather() 如果工作人员是远程的,则会导致崩溃

问题描述

我需要调用一个做复杂事情的函数;并且需要迭代几十万次(想想蒙特卡罗模拟)。我在这里是因为它在我的设置(第一次用户)中不适用于 dask。因此,我将代码简化为准系统(见下文),以确定它是人类、环境还是两者兼而有之:-)。希望这里的大师能帮助我吗?

代码

def do(x):
    y = x**2.0 
    return y
client = Client("tcp://10.61.68.34:8786")
x=[]
x = client.map(do, range(0, 1799))
r =[]
for i in x:
    r.append(i.result())
print(r)
del x

SETUP
Scheduler : ip1, vmware vm, windows os, 128 cores, 200GB RAM, py 3.9, dask 2021.9.1
Worker :同上

网络:节点内(在 1 个单个节点内),入站规则允许传入 TCP 连接到 8786、8787。不需要针对临时端口的连接的特定规则,因为调度程序将使用用于 TCP SYN 数据包的工作人员的临时端口与工作人员进行通信.

使用 CLI 启动调度程序:

dask-scheduler.exe

使用 CLI 启动 worker:

dask-worker.exe --nprocs 100 --nthreads 1 tcp://<<ip1..>>:8786

dask.yml 有:

distributed:
  version: 2

  adaptive:
    interval: 1s
    target-duration: 15m

  admin:
    tick:
      interval: 20ms 
      limit: 300s 
  
  comm:
    retry:
      count: 5 
      offload: 100MiB
    
    zstd:
      level: 15 
      threads:0   # need to respect Python's GIL as not all delayed code handles dask objects
  
  deploy:
    lost-worker-timeout: 30s  
    cluster-repr-interval: 750ms 

  logging:
      distributed: debug
      distributed.client: debug
      bokeh: critical
      tornado: critical
      tornado.application: error

  scheduler:
    bandwidth: 10000000000    # 10 Gb/s even though LAN is capable of 100Gbps
    default-data-size: 100kiB
    events-cleanup-delay: 300s
    idle-timeout: 1h    
  
  worker:
    connections:            # Maximum concurrent connections for data
      outgoing: 300          # This helps to control network saturation
      incoming: 300
    lifetime:
      duration: 1800s  

在 iPython 中(来自 VSCode——环境设置正确),一旦地图完成

after <Future: finished, type: float, key: do-b4e893e37f05790c2d90f6f7a57bc865>,

调用 x[0].result() 产生预期结果 (1.0)
调用 x[1].result() 产生预期结果 (4.0)
调用自定义范围

for i in range(2,10):
 y.append(x[i].result())
print(y)

还打印预期的答案。
结论代码按要求运行

现在,当工作人员从不同的机器启动时(不再存在本地工作人员)
调度程序ip1、vmware vm、windows os、128 核、200GB RAM、dask 2021.9.1
工作人员ip2、vmware vm、windows os、dask 2021.9.1
值得一提的是:这两台机器之间的100Gbps网络链接,防火墙允许8786、8786在两者上入站

调用 for 循环时的 iPython 输出;它继续 ad-inifinitium (直到手动中断):

distributed.client - DEBUG - Waiting on futures to clear before gather
distributed.comm.tcp - DEBUG - Setting TCP keepalive: idle=10, interval=2
distributed.comm.tcp - DEBUG - Setting TCP keepalive: idle=10, interval=2
distributed.comm.tcp - DEBUG - Setting TCP keepalive: idle=10, interval=2
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-9755ae4fe21461f71d76472d06ed69b7'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-4393edb8f90330ff85a14a347b57275f'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-b2354f9980c5f46e83153d4f2583f6f1'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-fa2cd7fe07c50c80dda8b5a308a7655c'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-80e7f4ee2b14bc0276a09b77db1e9d43'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-16311a9900ff7294aef0f0f3432b82fb'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-8ba4db219b30ddb8ee8ed086c752e72c'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-37eb0abfc3030871834fbb4ce6fdcf6e'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-5f894a60cd293af00b628a1c9d6972d0'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-e762686340db94c9b9617ba959694ecf'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-4e270441aebb8ed2a00f48ca3fc0391b'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-b1bd0b36b8e3b282d2caf42d373ccedd'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-a93dc9cc91d1a7e3fa7bc88673b7e33e'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-68324d1e2f16f3dd3ed2a4521280a1f1'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-94e3a7bdb4065cf34d47f6667ed5a695'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-0dbc31f32cb02f4925ca30ac92064eb1'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-70336d000742ece9b066b6c111f20fd3'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-ecd6c5c5caf0fe8d5499576f9f53764e'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-cb1d841d06994f4f5cba045e2c65e881'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-3015b21187427be5873d6477544063ae'}
distributed.client - DEBUG - Client receives message {'op': 'lost-data', 'key': 'do-f63da4cc3e1e26d64cc927f622d2631b'}
...

调度程序的示例输出:

distributed.core - DEBUG - Message from 'tcp://<i3p>:53905': {'op': 'heartbeat_worker', 'address': 'tcp://<ip2>:53784', 'now': 1633623013.8946366, 'metrics': {'executing': 0, 'in_memory': 29, 'ready': 0, 'in_flight': 0, 'bandwidth': {'total': 10000000000, 'workers': {}, 'types': {}}, 'spilled_nbytes': 0, 'cpu': 0.0, 'memory': 104374272, 'time': 1633623013.8877068, 'read_bytes': 6227.017170580167, 'write_bytes': 19296.954301894224, 'read_bytes_disk': 0.0, 'write_bytes_disk': 10238.384044113825}, 'executing': {}, 'reply': True}
distributed.core - DEBUG - Calling into handler heartbeat_worker

散景状态显示没有错误。

让我们从上面的日志中选择一个示例任务来检查它的状态@worker:

distributed.worker - DEBUG - Send compute response to scheduler: do-b6553fa57215f934079c36275fe3985d, {'op': 'task-finished', 'status': 'OK', 'nbytes': 24, 'type': <class 'float'>, 'start': 1633623109.3889816, 'stop': 1633623109.3890078, 'thread': 14244, 'key': 'do-b6553fa57215f934079c36275f**e3985d**'}

一旦所有任务在工作人员处都处于“任务完成”状态,调试消息仅指示正常的心跳消息,没什么特别的。

但是,没有结果!调度程序 + 工作人员日志都显示活动,但仅此而已。

本地输出(iPython 或 VSCode 运行)显示:

    distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'do-473302f1b42c06c5a75fe179c7ffe473': ('tcp://10.61.68.33:54198',)}
distributed.client - DEBUG - Waiting on futures to clear before gather
distributed.client - DEBUG - Client receives message {'op': 'key-in-memory', 'key': 'do-473302f1b42c06c5a75fe179c7ffe473'}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'do-473302f1b42c06c5a75fe179c7ffe473': ('tcp://10.61.68.33:54264',)}
distributed.client - DEBUG - Waiting on futures to clear before gather
distributed.client - DEBUG - Client receives message {'op': 'key-in-memory', 'key': 'do-473302f1b42c06c5a75fe179c7ffe473'}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'do-473302f1b42c06c5a75fe179c7ffe473': ('tcp://10.61.68.33:54198',)}
distributed.client - DEBUG - Waiting on futures to clear before gather
distributed.client - DEBUG - Client receives message {'op': 'key-in-memory', 'key': 'do-473302f1b42c06c5a75fe179c7ffe473'}

几次重试后,它放弃了,抛出异常,程序结束。每次我用远程工作者运行它时都会发生这种情况

如果使用 .persist() 后跟 client.gather() 重构相同的代码,行为会变得非常奇怪。在这种情况下,工作人员在任务完成后进入“任务忘记”状态并强制关闭连接。我很高兴在它上面开始一个单独的线程。但是,我敢肯定它是一些非常基本的东西让我在这里绊倒。帮助将不胜感激

标签: pythondaskdask-distributed

解决方案


推荐阅读