首页 > 解决方案 > Dask 工作人员在启动后不久就超时

问题描述

下午好 SO,我正在尝试使用在集群上运行的 Dask 和 wrf-python 在 Python 中部署 WRF 后处理解决方案,但是我遇到了 dask 调度程序和工作实例之间的交互性问题。

在我的过程中,我有一个调度程序,它使用以下代码块部署在主脚本(在集群的登录节点上运行)上:

    cLoop = IOLoop.current()
    t = Thread(target = cLoop.start, daemon = True)
    t.start()
    s = Scheduler(loop = cLoop, dashboard_address = None)
    s.start("tcp://: " + str(scheduler_port))
    dask_client = Client("tcp://" + socket.gethostname() + ":" + str(scheduler_port))

然后,此脚本等待 dask worker 在系统的计算节点上运行,这些节点通过两个 shell 脚本(一个用于作业,另一个用于启动 worker)进行初始化:

#!/bin/bash
#COBALT -t 60
#COBALT -n 8
#COBALT -A climate_severe
#COBALT -q debug-cache-quad
#COBALT --attrs mcdram=cache:numa=quad

aprun -n ${COBALT_JOBSIZE} -N 1 -d 64 -j 1 ./launch-worker.sh

第二个脚本是根据运行原始脚本的登录节点和程序中的配置设置以编程方式生成的

#!/bin/bash
export PYTHONPATH=${PYTHONPATH}:/projects/climate_severe/wrf-run/post/Python/

/projects/climate_severe/Python/anaconda/bin/python3.7 -m distributed.cli.dask_worker \
thetalogin4:12345 --nprocs 1\
 --death-timeout 120 --no-dashboard

此设置功能和工作人员连接到调度程序,但是它们在初始连接建立后大约一两分钟终止。调度程序不会将任何错误推送到 python 终端(调试打印通过以下方式打开:logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG))。工作人员信息被推送到运行期间生成的作业错误文件:

distributed.worker - INFO -       Start worker at:  tcp://10.236.16.130:16839
distributed.worker - INFO -          Listening to:  tcp://10.236.16.130:16839
distributed.worker - INFO - Waiting to connect to:    tcp://thetalogin4:12345
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                        256
distributed.worker - INFO -                Memory:                  202.69 GB
distributed.worker - INFO -       Local Directory: /lus/theta-fs0/projects/climate_severe/runs/20180601/postprd/worker-4u3lent8
.
.
.
distributed.core - INFO - Event loop was unresponsive in Worker for 25.80s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.worker - ERROR - Worker stream died during communication: tcp://10.236.16.120:15761
Traceback (most recent call last):
  File "/projects/climate_severe/Python/anaconda/lib/python3.7/site-packages/distributed/comm/core.py", line 218, in connect
    quiet_exceptions=EnvironmentError,
  File "/projects/climate_severe/Python/anaconda/lib/python3.7/site-packages/tornado/gen.py", line 729, in run
    value = future.result()
tornado.util.TimeoutError: Timeout

在此错误之后,还有更多关于 StreamClosedError 和缺少的依赖项的快速推送:

OSError: Timed out trying to connect to 'tcp://10.236.16.120:15761' after 10 s: in <distributed.comm.tcp.TCPConnector object at 0x2aaaed9616a0>: ConnectionRefusedError: [Errno 111] Connection refused
distributed.worker - INFO - Can't find dependencies for key ('wrapped_add_then_div-fe3451c36b590fa821c9101013c573b4', 0, 0, 0)
distributed.worker - INFO - Dependent not found: ('getitem-6f7afbff56ac240317ffbfde59bfcb8a', 0, 0, 0) 0 .  Asking scheduler
distributed.worker - ERROR - Worker stream died during communication: tcp://10.236.16.130:28228

请求的函数位于 PYTHONPATH 目录中的 python 脚本中(在 launch-worker.sh 脚本中设置)。

让我感到困惑的是,我有一个这个包的最小工作版本(一个节点,8 个进程),它使用了一些曾经发生过的变量,但是当我开始增加变量的数量时,它就开始这样做了。我已尝试将设置更改为 8 个节点,每个节点 1 个进程以将内存余量从 16GB 增加到完整的 200GB,我什至将脚本减少为仅对单个变量进行后处理,到目前为止还没有成功完成(我每次都得到同样的错误)。

非常感谢您在此处确定问题根源的任何帮助,如果需要额外的上下文,可以在 GitHub 上获得完整的代码。

谢谢!

标签: pythondaskdask-distributed

解决方案


推荐阅读