python - 根据之前的任务结果修改 Dask 任务图
问题描述
我有两个任务;一慢一快。这些任务在各自的数据上迭代运行。我偶尔会根据更快任务的输出执行检查,在某些情况下,检查会告诉我交换来自两个来源的数据。
我想要做的是让这个工作流在等待慢速任务时运行而没有不必要的阻塞。我想到的第一种方法是让 Dask 正在执行的底层 DAG 依赖于其中一个任务的输出。如果check
任务说我不需要交换,那么我可以立即运行下一个fast_data_task
. 这些 DAG 说明了这个想法:黑色 DAG 是阻塞版本,如果我们假设不交换数据,红色 DAG 是非阻塞的,如果我们假设我们应该交换数据check
,蓝色 DAG 是非阻塞的check
. Dask中有没有办法根据之前任务的结果修改任务依赖关系?(或者有没有其他机制可以达到同样的效果?)
另一种可能的方法可能是如果我真的可以将其发送Future
给工人;即,发送对结果的引用,而不是结果本身。在这种情况下,更新任务可能取决于布尔结果check
和两个“未来引用”。这些“未来参考”永远不会被阻塞(它们只是用来提供任务图的正确输入/输出;结果的实际内容不用于更新任务)。更新任务返回的Future
只是对正确者的引用。但是,我不知道 Dask 是否可以做到这一点。
下面是一个使用阻塞(黑色 DAG)的示例代码。我希望可以对其进行修改以提供相同的结果,但在等待slow_data_task
完成时不要在“更新”阶段阻塞。(另外,我想避免阻塞以后的运行other_task
——这种交换只是更大的任务图的一部分!)
import distributed
import time
import numpy as np
def slow_data_task(input_data):
time.sleep(5)
return input_data
def fast_data_task(input_data):
return input_data
def other_task(input_data):
return input_data
def check(fast_data):
do_mixing = np.random.choice([True, False])
return fast_data, do_mixing
def update_fast_data(check_1_result, slow_data):
fast_data, needed = check_1_result
if needed:
fast_data = slow_data
return fast_data
def update_slow_data(check_1_result, slow_data):
fast_data, needed = check_1_result
if needed:
slow_data = fast_data
return slow_data
client = distributed.Client()
fast_data = 'foo'
slow_data = 'bar'
other_data = 'qux'
n_iterations = 2
for iteration in range(n_iterations):
slow_data = client.submit(slow_data_task, slow_data)
fast_data = client.submit(fast_data_task, fast_data)
other_data = client.submit(other_task, other_data)
check_1_result = client.submit(check, fast_data)
new_fast_data = client.submit(update_fast_data, check_1_result, slow_data)
new_slow_data = client.submit(update_slow_data, check_1_result, slow_data)
fast_data = new_fast_data
slow_data = new_slow_data
更多细节
我想出了一种适用于这个玩具系统的方法,但在我的实际用例中失败了。这会将任务的输出更改为(data, task_type)
、 where ,并将andtask_type in ['fast', 'slow']
替换为,根据的值,其内部行为会有所不同。只需 1 个交换耦合这些,就可以了。但是在实际系统中,许多不同的数据源之间可能存在耦合(快、慢、快、慢),这种方法将无法知道哪些是交换中的其他数据。(任务取决于其他数据源的名称,即 ,即使它不依赖于特定数据。)fast_data_task
slow_data_task
data_task
task_type
check
'slow'
解决方案
推荐阅读
- android - 将上传的图片从用户图库传递到另一个活动,然后将图片上传到 Firebase
- authentication - Nuxt - 如何防止登录后重定向到主页
- python - 如何销毁使用 for 循环生成的按钮(单击后)
- visual-c++ - 具有多个条件的 if 语句无法使用 || 或 $$
- html - 个人资料卡 - 将图像放在我的 div 的底部中心
- python - 尝试将斐波那契项添加到文件名时获取不完整的数据
- rust - Rust 中 Bool 的打印值
- r - 在 .Rprofile 为什么不加载(“.RData”)工作,但它会在 .Rprofile 调用的 R 脚本中工作?
- java - 如何进入 Cream Text Editor 的命令模式?
- python - 如何将 unicode 写入 txt?Python