python - 当太多工作人员失败时使 Dask 应用程序失败
问题描述
我正在 EMR 集群上使用 Dask YARN (0.6.0) 运行 Dask (1.2) 应用程序。今天我遇到了我的工人失败的情况(由于 HDFS 错误)并且 skein.ApplicationMaster 会不断地重新创建新的工人。如果太多工人失败,有没有办法指示 Dask YARN 取消应用程序?
具体来说,我的 Application Master 日志如下所示:
19/06/21 16:00:27 INFO skein.ApplicationMaster: RESTARTING: adding new container to replace dask.worker_805.
19/06/21 16:00:27 INFO skein.ApplicationMaster: REQUESTED: dask.worker_806
19/06/21 16:00:27 WARN skein.ApplicationMaster: FAILED: dask.worker_804 - Could not obtain block: BP-1234110000-10.174.17.184-1561122672601:blk_1073741831_1007 file=/user/hadoop/.skein/application_1561122685021_0003/FED3ABF369AAE224B4BB8A3A77120E1C/cached_volume.sqlite3
org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1234110000-10.174.17.184-1561122672601:blk_1073741831_1007 file=/user/hadoop/.skein/application_1561122685021_0003/FED3ABF369AAE224B4BB8A3A77120E1C/cached_volume.sqlite3
at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:983)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:642)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:882)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:85)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:59)
at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:119)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:366)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:267)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
无止境
解决方案
如果使用主构造函数,您可以使用worker_restarts
kwarg 设置工作者重启的最大次数:
# Allow a maximum of 3 worker restarts before failure
cluster = YarnCluster(worker_restarts=3, ...)
或者,如果使用自定义规范,您可以使用max_restarts指定允许的最大重启次数。
# /path/to/spec.yaml
name: dask
queue: myqueue
services:
dask.worker:
# Don't start any workers initially
instances: 0
# A maximum of 3 worker failures are allowed before failure
max_restarts: 3
# Restrict workers to 4 GiB and 2 cores each
resources:
memory: 4 GiB
vcores: 2
# Distribute this python environment to every worker node
files:
environment: /path/to/my/environment.tar.gz
# The bash script to start the worker
# Here we activate the environment, then start the worker
script: |
source environment/bin/activate
dask-yarn services worker
推荐阅读
- javascript - exit().remove() 不在视野范围内时不会删除节点
- python - 如何更改 pandas df 中的日期时间值?
- c# - 如何在列表中存储、排序整数(来自用户的输入)?
- android - 如何以编程方式相对于 ConstraintLayout 中的另一个视图设置视图的宽度/边距
- javascript - 我需要将图像放置在背景图像的底部,并在调整窗口大小时让它们保持原位
- java - 无法获取下载路径
- typescript - Jsdom 如何“轮询特定元素的存在”?
- json - 如何为 PowerShell 类属性指定 JSON 变量名称?
- annotations - %hint 注释是否导入/Dec 和自动注释?
- javascript - 如何根据用户输入检索和设置月份的最后一天?