首页 > 解决方案 > 在函数调用中使用共享大内存的 dask

问题描述

我在一个有 4 个节点的 HPC 集群上使用 Dask,每个节点有 12 个核心。我的代码是纯 Python 处理列表和集合,并在紧密的 Python for 循环中进行大部分计算。我在这里阅读了一个答案,该答案建议使用更多的进程和更少的线程进行此类计算。

如果我 client = Client(n_workers=24, threads_per_worker=2) 在使用 Python 列表时进行计算.map()并将.compute()工作并行拆分为 48 个块?GIL 不是只允许一个线程,因此只允许 24 个并行计算吗?编辑:如果我使用多处理模块并调用线程池,在单个节点上它会更快吗?我可以dask与多处理模块中的 4 个工作人员(每个节点 1 个工作人员)和 12 个线程池一起使用吗?

我的精简代码如下所示:

b = db.from_sequence([some_list], npartitions=48).map(my_func, g, k)
m_op = b.compute()

def my_func(g, k):
    # several for loops
    return

数据g是一个相当大的列表,如果我使用更多的进程,这会重复,因此成为瓶颈。我还尝试使用 gx = dask.delayed(g)并传递gx给该函数。这也是内存和时间的消耗。我明白(从stackoverflow上的答案),我可以使用: [future] = c.scatter([g]) 但如果我所有的工人随机使用数据g,我将不得不这样做broadcast,这将再次消耗内存。请注意,我没有修改g我的功能。解决这个问题的正确方法是什么?

另一个小的观察/问题dask是: my_func正在搜索某物,并返回找到的元素列表。如果特定工作人员没有找到元素,则返回一个空列表。最后连接输出,我有一段丑陋的代码,如下所示:

for sl in m_op:                                                                                                                                                               
    for item in sl:                                                                                                                                                                    
        if item != []:                                                                                                                                                                 
            nm_op.append(item) 

有一个更好的方法吗?非常感谢您的时间。

标签: pythondaskdask-distributed

解决方案


推荐阅读