首页 > 解决方案 > 根据之前的任务结果修改 Dask 任务图

问题描述

我有两个任务;一慢一快。这些任务在各自的数据上迭代运行。我偶尔会根据更快任务的输出执行检查,在某些情况下,检查会告诉我交换来自两个来源的数据。

我想要做的是让这个工作流在等待慢速任务时运行而没有不必要的阻塞。我想到的第一种方法是让 Dask 正在执行的底层 DAG 依赖于其中一个任务的输出。如果check任务说我不需要交换,那么我可以立即运行下一个fast_data_task. 这些 DAG 说明了这个想法:黑色 DAG 是阻塞版本,如果我们假设不交换数据,红色 DAG 是非阻塞的,如果我们假设我们应该交换数据check,蓝色 DAG 是非阻塞的check. Dask中有没有办法根据之前任务的结果修改任务依赖关系?(或者有没有其他机制可以达到同样的效果?)

三个 DAG

另一种可能的方法可能是如果我真的可以将其发送Future给工人;即,发送对结果的引用,而不是结果本身。在这种情况下,更新任务可能取决于布尔结果check和两个“未来引用”。这些“未来参考”永远不会被阻塞(它们只是用来提供任务图的正确输入/输出;结果的实际内容不用于更新任务)。更新任务返回的Future只是对正确者的引用。但是,我不知道 Dask 是否可以做到这一点。

下面是一个使用阻塞(黑色 DAG)的示例代码。我希望可以对其进行修改以提供相同的结果,但在等待slow_data_task完成时不要在“更新”阶段阻塞。(另外,我想避免阻塞以后的运行other_task——这种交换只是更大的任务图的一部分!)

import distributed
import time
import numpy as np

def slow_data_task(input_data):
    time.sleep(5)
    return input_data

def fast_data_task(input_data):
    return input_data

def other_task(input_data):
    return input_data

def check(fast_data):
    do_mixing = np.random.choice([True, False])
    return fast_data, do_mixing

def update_fast_data(check_1_result, slow_data):
    fast_data, needed = check_1_result
    if needed:
        fast_data = slow_data
    return fast_data

def update_slow_data(check_1_result, slow_data):
    fast_data, needed = check_1_result
    if needed:
        slow_data = fast_data
    return slow_data

client = distributed.Client()

fast_data = 'foo'
slow_data = 'bar'
other_data = 'qux'
n_iterations = 2
for iteration in range(n_iterations):
    slow_data = client.submit(slow_data_task, slow_data)
    fast_data = client.submit(fast_data_task, fast_data)
    other_data = client.submit(other_task, other_data)
    check_1_result = client.submit(check, fast_data)
    new_fast_data = client.submit(update_fast_data, check_1_result, slow_data)
    new_slow_data = client.submit(update_slow_data, check_1_result, slow_data)
    fast_data = new_fast_data
    slow_data = new_slow_data

更多细节

我想出了一种适用于这个玩具系统的方法,但在我的实际用例中失败了。这会将任务的输出更改为(data, task_type)、 where ,并将andtask_type in ['fast', 'slow']替换为,根据的值,其内部行为会有所不同。只需 1 个交换耦合这些,就可以了。但是在实际系统中,许多不同的数据源之间可能存在耦合(快、慢、快、慢),这种方法将无法知道哪些是交换中的其他数据。(任务取决于其他数据源的名称,即 ,即使它不依赖于特定数据。)fast_data_taskslow_data_taskdata_tasktask_typecheck'slow'

标签: pythondask

解决方案


推荐阅读