首页 > 解决方案 > Dask 如何决定是否重新运行任务

问题描述

我对 Dask 很陌生,并试图构建一个系统来执行具有依赖关系的计算图。但是,尽管某些任务具有静态签名,但它们却被执行了两次,这让我感到非常困惑。例如:

Python 3.7.5 (default, Nov 12 2019, 11:34:05)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask.distributed import Client
>>> client = Client()
>>> def a():
...   print("a")
...
>>> client.gather(client.submit(a))
a
>>> client.gather(client.submit(a))
a
>>> client.submit(a)
<Future: pending, key: a-a5eb50e9015acdf60b1094aa4e467e00>
a
>>> client.submit(a)
<Future: finished, type: builtins.NoneType, key: a-a5eb50e9015acdf60b1094aa4e467e00>
>>> client.gather(client.submit(a))
>>> client.gather(client.submit(a))
>>>

因此,它看起来像是a()使用 为每个调用执行的client.gather(client.submit(a)),但仅在我client.submit(a)自己调用之前执行,之后Future重复使用相同的函数并且不再调用该函数。这是为什么?

在我的计算图中,当两个任务依赖于同一个任务时,这将是一个问题,该任务应该只执行一次。我目前处理此类依赖项(递归)的方法如下:

from dask.distributed import Client, worker_client

def x(n):
    dgraph = {
        'a': [],
        'b': ['a'],
        'c': ['b', 'a'],
        'd': ['b', 'c']
        }
    print(n)
    with worker_client() as client:
        client.gather(list(client.submit(x, d) for d in dgraph[n]))

if __name__ == '__main__':
    client = Client()
    result = client.submit(x, 'd')
    client.gather(result)

有趣的是,执行该脚本时 python 的输出并不稳定:

$ python test_dask2.py
d
b
c
a
b
a
a
$ python test_dask2.py
d
b
c
a
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-bff0c0d6e4239ae9c5beaed070018a1e'}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-59dc11a9fc2db8a0885e47d3e5891304'}
$

有没有办法确保具有给定输入的给定任务即使我多次提交也只执行一次?如果我正确理解文档,那应该是正常行为。如果调用print是防止这种情况发生的副作用,为什么它不一致,我如何防止生成输出文件的任务被执行两次?

另外,最后的错误是什么,并不总是发生?

编辑:

我想我明白了为什么我的一些任务在第二个片段中运行了多次:submit即使多次提交相同的任务(甚至在任务完成之前),dask 分配给任务以进行识别的哈希值似乎有时也会有所不同并超出范围)。将key参数设置为固定值(例如任务名称)submit可以解决该问题。

标签: pythondaskdask-distributed

解决方案


The short answer is: dask keeps a result in memory while something needs it. In these cases, "need" can be either a future in your session or another task that depends on the result.

In a line like client.gather(client.submit(a)), the future generated by submit is forgotten immediately after it is gathered. In a line like client.submit(a), the future generated is stored in the "last result" variable _ of the session, and so remains remembered and the cluster does not clear it.

If you wanted more control, you can assign these variables:

fut = client.submit(a)  # sets func running, keeps hold of the future
fut2 = client.submit(a)  # uses already existing task to get result
client.gather(fut), fut.result() # get results
del fut2, fut  #  "forget" futures, and have cluster release them

Remember to use the dashboard to see the current state of the cluster.


推荐阅读