dask - Dask:如何将延迟函数与工作资源一起使用?
问题描述
我想制作一个包含 CPU 和 GPU 任务的 Dask Delayed 流。GPU 任务只能在 GPU Worker 上运行,而一个 GPU Worker 只有一个 GPU,一次只能处理一个 GPU 任务。
不幸的是,我看不到在延迟 API 中指定工作资源的方法。
下面是常用代码:
client = Client(resources={'GPU': 1})
@delayed
def fcpu(x, y):
sleep(1)
return x + y
@delayed
def fgpu(x, y):
sleep(1)
return x + y
这是用纯延迟编写的流程。此代码将无法正常运行,因为它不了解 GPU 资源。
# STEP ONE: two parallel CPU tasks
a = fcpu(1, 1)
b = fcpu(10, 10)
# STEP TWO: two GPU tasks
c = fgpu(a, b) # Requires 1 GPU
d = fgpu(a, b) # Requires 1 GPU
# STEP THREE: final CPU task
e = fcpu(c, d)
%time e.compute() # 3 seconds
这是我能想到的最好的解决方案。它结合了延迟语法和 Client.compute() 期货。它似乎表现正确,但它非常难看。
# STEP ONE: two parallel CPU tasks
a = fcpu(1, 1)
b = fcpu(10, 10)
a_future, b_future = client.compute([a, b]) # Wo DON'T want a resource limit
# STEP TWO: two GPU tasks - only resources to run one at a time
c = fgpu(a_future, b_future)
d = fgpu(a_future, b_future)
c_future, d_future = client.compute([c, d], resources={'GPU': 1})
# STEP THREE: final CPU task
e = fcpu(c_future, d_future)
res = e.compute()
有一个更好的方法吗?
解决方案
也许类似于https://jobqueue.dask.org/en/latest/examples.html中描述的方法这是在一台 GPU 机器或带有 SSD 的机器上处理的情况。
def step_1_w_single_GPU(data):
return "Step 1 done for: %s" % data
def step_2_w_local_IO(data):
return "Step 2 done for: %s" % data
stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]
result_stage_2 = client.compute(stage_2,
resources={tuple(stage_1): {'GPU': 1},
tuple(stage_2): {'ssdGB': 100}})
推荐阅读
- html - 需要代码行以从下拉列表中选择付款性质
- c++ - Freeimage FreeImage_ConvertTo24Bits 返回空指针
- javascript - 如何在api中发送数据而不会丢失
- cron - 在特定的时间间隔内以不同的频率运行 cron
- oracle - 部署复合材料时出现一致性错误
- java - 获取 java.sql.SQLException:列计数与第 1 行的值计数不匹配
- rule-engine - 如何编写 jsonlogic.js 规则来针对数组中的所有对象(一些)测试一个对象?
- c# - 是否考虑区域设置特定信息在 HTTP 绑定 Azure 函数中的自动反序列化?
- node.js - 通过不同的条件组合两行并更改 MongoDB 中的列名
- devops - 无法安装 Spin CLI (Spinnaker)