首页 > 解决方案 > 当 Dask 任务运行多次时,使用哪个结果?

问题描述

首先,阅读这个问题: Repeated task execution using the Distributed Dask scheduler

现在,当 Dask 由于工作人员窃取或任务失败(例如由于每个进程的内存限制)而决定重新运行任务时,哪个任务结果会传递到 DAG 的下一个节点?我们正在使用嵌套任务,例如

@dask.delayed
def add(n):
    return n+1

t_a = add(1)
t_b = add(t_a)
the_output = add(add(add(t_b)))

因此,如果其中一个任务失败或被盗,并且运行了两次,哪个结果会传递给 DAG 中的下一个节点?

感兴趣的人的进一步背景:出现这种情况的原因是我们的任务写入数据库。如果它运行两次,我们会得到一个完整性错误,因为它试图两次插入相同的记录(受约束idversion组合)。目前的计划是通过捕获任务中的完整性错误来使任务具有幂等性,但我仍然不明白 Dask 是如何“选择”结果的。

标签: daskdask-distributeddask-delayed

解决方案


如果你有类似的情况add(add(add(t_b)))

或更一般地说

x = add(1)
y = add(x)
z = add(y)

即使它们都使用相同的功能,它们都是独立的任务。Dask 发现它们有不同的输入,因此它以不同的方式对待它们。

因此,如果其中一个任务失败或被盗,并且运行了两次,哪个结果会传递给 DAG 中的下一个节点?

在所有这些情况下,集群上一次只有一个有效结果。被盗任务只能在新机器上运行,而不是旧机器上。如果任务的结果丢失并且必须重新运行,那么只有新值会出现在任何地方(旧值丢失了,请记住)。


推荐阅读