python - 如何使用 dask 分布式?
问题描述
我试图通过查看代码示例和文档来使用 Dask,但无法理解它是如何工作的。正如文档中所建议的,我正在尝试使用分布式调度程序(我还计划在 HPC 上部署我的代码)。
我尝试的第一件简单的事情是这样的:
from dask.distributed import Client
import dask.bag as db
if __name__ == '__main__':
client = Client(n_workers=2)
print("hello world")
hello world
印刷了三次,我认为这是因为工人。我假设除非调用计算,否则不会启动工作人员。我可以将我的打印语句移动到一个函数:
if __name__ == '__main__':
client = Client(n_workers=2)
def print_func():
print("hello world")
但是,如何确保只有一个工作人员执行此功能?(在 MPI 中,我可以通过使用来做到这一点rank == 0
;我没有找到任何类似的东西MPI_Comm_rank()
可以告诉我 Dask 中的工人编号或 ID)。
我更进一步,开始使用 Dask 中提供的示例:
from dask.distributed import Client
import dask.bag as db
if __name__ == '__main__':
client = Client()
def is_even(n):
return n % 2 == 0
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
print(c.compute())
但这显示了一个错误:An attempt has been made to start a new process before the current process has finished its bootstrapping phase
. 我假设它dask.bag
会自动拆分计算工作。对于一篇冗长的帖子,我深表歉意,但我无法理解 Dask(我习惯于 MPI 和 OpenMP 编程)。
解决方案
但是,如何确保只有一个工作人员执行此功能?(在 MPI 中,我可以通过使用来做到这一点
rank == 0
;我没有找到任何类似的东西MPI_Comm_rank()
可以告诉我 Dask 中的工人编号或 ID)。
这实际上是if __name__ == '__main__'
块正在检查的内容。当您的脚本直接运行时,该条件为真;当工作人员将其作为模块导入时,情况并非如此。您放在此块之外的任何代码都由每个工人在启动时运行;这应该仅限于函数定义和必要的全局设置。所有实际工作的代码都需要在if __name__ == '__main__'
块中,或者在仅从该块内部调用的函数内部。
推荐阅读
- gitlab - 无法将变量添加到 gitlab
- android - android connect python websocket with 426 Upgrade Required 错误
- angularjs - 如何修复 ngOnInit 中未定义的可观察订阅返回
- html - MobileSafari 中的平滑滚动捕捉类型?
- php - 使用 For 循环创建关联数组
- api - 刷新令牌后旧令牌仍然有效 Owin OAuth
- javascript - 我使用 sendSignedTransaction,它给了我这个错误 "gas" is missing
- r - 为什么大型数据结构的 dump+source 比 save+load 或 saveRDS+readRDS 慢得多,即使 ascii=TRUE?
- python - 数据帧作为输入,数据帧作为输出
- html - 如何结合垂直对齐和缩进