首页 > 解决方案 > Dask:如何将延迟函数与工作资源一起使用?

问题描述

我想制作一个包含 CPU 和 GPU 任务的 Dask Delayed 流。GPU 任务只能在 GPU Worker 上运行,而一个 GPU Worker 只有一个 GPU,一次只能处理一个 GPU 任务。

不幸的是,我看不到在延迟 API 中指定工作资源的方法。

下面是常用代码:

client = Client(resources={'GPU': 1})

@delayed
def fcpu(x, y):
    sleep(1)
    return x + y

@delayed
def fgpu(x, y):
    sleep(1)
    return x + y

这是用纯延迟编写的流程。此代码将无法正常运行,因为它不了解 GPU 资源。

# STEP ONE: two parallel CPU tasks
a = fcpu(1, 1)
b = fcpu(10, 10)

# STEP TWO: two GPU tasks
c = fgpu(a, b)  # Requires 1 GPU
d = fgpu(a, b)  # Requires 1 GPU

# STEP THREE: final CPU task
e = fcpu(c, d)

%time e.compute()  # 3 seconds

这是我能想到的最好的解决方案。它结合了延迟语法和 Client.compute() 期货。它似乎表现正确,但它非常难看。

# STEP ONE: two parallel CPU tasks
a = fcpu(1, 1)
b = fcpu(10, 10)
a_future, b_future = client.compute([a, b]) # Wo DON'T want a resource limit

# STEP TWO: two GPU tasks - only resources to run one at a time
c = fgpu(a_future, b_future)
d = fgpu(a_future, b_future)
c_future, d_future = client.compute([c, d], resources={'GPU': 1})

# STEP THREE: final CPU task
e = fcpu(c_future, d_future)
res = e.compute()

有一个更好的方法吗?

标签: daskdask-distributeddask-delayed

解决方案


也许类似于https://jobqueue.dask.org/en/latest/examples.html中描述的方法这是在一台 GPU 机器或带有 SSD 的机器上处理的情况。

def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,
                                resources={tuple(stage_1): {'GPU': 1},
                                           tuple(stage_2): {'ssdGB': 100}})

推荐阅读