首页 > 解决方案 > Dask分布式worker任务分配

问题描述

Dask 默认在任务提交时分发。如果我们有一个任务运行时间取决于输入,并且某些输入可能导致任务运行时间比其他输入长得多,那么我们最终可能会将所有长时间运行的任务分配给一个工作人员。下面是一个例子来说明这个问题:

import time

from distributed import Client, wait


def uneven_task(num):
    """If num is odd number, it will take much longer to complete"""
    print(f"Running task {num}")
    if num % 2 != 0:
        time.sleep(num*100)
    print(f"Task {num} has finished. Worker is free.")


if __name__ == "__main__":
    client = Client(address="xx.xxx.xx.xx:8786")
    futures = []
    for i in range(10):
        future = client.submit(uneven_task, i, priority=1)
        futures.append(future)

    wait(futures)

2名工人开始。提交了 10 个任务。奇数输入会导致任务长,但偶数任务会很快。我可以看到所有 5 个短期任务都分配给了一名工作人员,而其他 5 个长期运行的任务都分配给了其他工作人员。 仪表板

有什么方法可以配置 dask 仅在工作人员空闲时分配任务,而不是在提交任务时分配它们?

标签: pythondaskdask-distributed

解决方案


推荐阅读