首页 > 解决方案 > 如何使用 dask 分布式?

问题描述

我试图通过查看代码示例和文档来使用 Dask,但无法理解它是如何工作的。正如文档中所建议的,我正在尝试使用分布式调度程序(我还计划在 HPC 上部署我的代码)。

我尝试的第一件简单的事情是这样的:

from dask.distributed import Client
import dask.bag as db

if __name__ == '__main__':
    client = Client(n_workers=2)

print("hello world")

hello world印刷了三次,我认为这是因为工人。我假设除非调用计算,否则不会启动工作人员。我可以将我的打印语句移动到一个函数:

if __name__ == '__main__':
    client = Client(n_workers=2)

def print_func():
    print("hello world")

但是,如何确保只有一个工作人员执行此功能?(在 MPI 中,我可以通过使用来做到这一点rank == 0;我没有找到任何类似的东西MPI_Comm_rank()可以告诉我 Dask 中的工人编号或 ID)。

我更进一步,开始使用 Dask 中提供的示例:

from dask.distributed import Client
import dask.bag as db

if __name__ == '__main__':
    client = Client()

def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
print(c.compute())

但这显示了一个错误:An attempt has been made to start a new process before the current process has finished its bootstrapping phase. 我假设它dask.bag会自动拆分计算工作。对于一篇冗长的帖子,我深表歉意,但我无法理解 Dask(我习惯于 MPI 和 OpenMP 编程)。

标签: pythondaskdask-distributed

解决方案


但是,如何确保只有一个工作人员执行此功能?(在 MPI 中,我可以通过使用来做到这一点rank == 0;我没有找到任何类似的东西MPI_Comm_rank()可以告诉我 Dask 中的工人编号或 ID)。

这实际上是if __name__ == '__main__'块正在检查的内容。当您的脚本直接运行时,该条件为真;当工作人员将其作为模块导入时,情况并非如此。您放在此块之外的任何代码都由每个工人在启动时运行;这应该仅限于函数定义和必要的全局设置。所有实际工作的代码都需要在if __name__ == '__main__'块中,或者在仅从该块内部调用的函数内部。


推荐阅读