python - Dask 如何决定是否重新运行任务
问题描述
我对 Dask 很陌生,并试图构建一个系统来执行具有依赖关系的计算图。但是,尽管某些任务具有静态签名,但它们却被执行了两次,这让我感到非常困惑。例如:
Python 3.7.5 (default, Nov 12 2019, 11:34:05)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask.distributed import Client
>>> client = Client()
>>> def a():
... print("a")
...
>>> client.gather(client.submit(a))
a
>>> client.gather(client.submit(a))
a
>>> client.submit(a)
<Future: pending, key: a-a5eb50e9015acdf60b1094aa4e467e00>
a
>>> client.submit(a)
<Future: finished, type: builtins.NoneType, key: a-a5eb50e9015acdf60b1094aa4e467e00>
>>> client.gather(client.submit(a))
>>> client.gather(client.submit(a))
>>>
因此,它看起来像是a()
使用 为每个调用执行的client.gather(client.submit(a))
,但仅在我client.submit(a)
自己调用之前执行,之后Future
重复使用相同的函数并且不再调用该函数。这是为什么?
在我的计算图中,当两个任务依赖于同一个任务时,这将是一个问题,该任务应该只执行一次。我目前处理此类依赖项(递归)的方法如下:
from dask.distributed import Client, worker_client
def x(n):
dgraph = {
'a': [],
'b': ['a'],
'c': ['b', 'a'],
'd': ['b', 'c']
}
print(n)
with worker_client() as client:
client.gather(list(client.submit(x, d) for d in dgraph[n]))
if __name__ == '__main__':
client = Client()
result = client.submit(x, 'd')
client.gather(result)
有趣的是,执行该脚本时 python 的输出并不稳定:
$ python test_dask2.py
d
b
c
a
b
a
a
$ python test_dask2.py
d
b
c
a
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-bff0c0d6e4239ae9c5beaed070018a1e'}
distributed.scheduler - CRITICAL - Tried writing to closed comm: {'op': 'lost-data', 'key': 'x-59dc11a9fc2db8a0885e47d3e5891304'}
$
有没有办法确保具有给定输入的给定任务即使我多次提交也只执行一次?如果我正确理解文档,那应该是正常行为。如果调用print
是防止这种情况发生的副作用,为什么它不一致,我如何防止生成输出文件的任务被执行两次?
另外,最后的错误是什么,并不总是发生?
编辑:
我想我明白了为什么我的一些任务在第二个片段中运行了多次:submit
即使多次提交相同的任务(甚至在任务完成之前),dask 分配给任务以进行识别的哈希值似乎有时也会有所不同并超出范围)。将key
参数设置为固定值(例如任务名称)submit
可以解决该问题。
解决方案
The short answer is: dask keeps a result in memory while something needs it. In these cases, "need" can be either a future in your session or another task that depends on the result.
In a line like client.gather(client.submit(a))
, the future generated by submit
is forgotten immediately after it is gathered. In a line like client.submit(a)
, the future generated is stored in the "last result" variable _
of the session, and so remains remembered and the cluster does not clear it.
If you wanted more control, you can assign these variables:
fut = client.submit(a) # sets func running, keeps hold of the future
fut2 = client.submit(a) # uses already existing task to get result
client.gather(fut), fut.result() # get results
del fut2, fut # "forget" futures, and have cluster release them
Remember to use the dashboard to see the current state of the cluster.
推荐阅读
- linux - 删除匹配行和所需行
- angular - 使用组件 getter 渲染路由器链接不起作用?
- objective-c - 在 Cocoa macOS 应用程序中,如何构建两个过滤所有项目或不过滤项目的 NSPredicates?
- java - 我无法获取数据库中的所有表名
- android - 如何更改 Flutter 应用程序的名称?
- mobx - 在 Mobx 中,一个复杂的组件可以拥有自己的商店吗?
- bash - 使文件成为可执行文件时,Bash 脚本命令会损坏
- ios - IOS,几个 UIButtons 链接到一个 IBAction - 滞后问题
- python - Django / DRF:过滤反向外键
- apache-spark - spark如何从HDFS加载文件以及它与RDD的关系