首页 > 解决方案 > 更新 mpi4py 中的字典值

问题描述

我们如何跨不同处理器更新 MPI(特别是 mpi4py)中的一个全局字典。广播后我现在遇到的问题是不同的处理器无法看到其他处理器对字典的更改(更新)。

例如输入数据如下:

   col1  col2
   -----------
    a      1
    a      1
    b      2
    c      3
    c      1

输出字典应如下所示:

  {'a': 2, 'b': 2, 'c': 4}

这意味着输入中的 col2 被加在一起并为键 (col1) 创建了值。字典最初是空的,并且在所有处理器的并行处理过程中得到更新(至少这是我们正在尝试做的)。

标签: pythonperformanceparallel-processingmpimpi4py

解决方案


我们如何跨不同处理器更新 MPI(特别是 mpi4py)中的一个全局字典。广播后我现在遇到的问题是不同的处理器无法看到其他处理器对字典的更改(更新)。

首先,您需要了解在 MPI 中,每个 MPI 进程都运行一个完整的程序副本。因此,在该程序上分配的所有数据对于每个进程都是私有的。

让我们看下面的例子:

from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    dictionary = {'a': 1, 'c': 3}
    for i in range(1, size, 1):
        data = comm.recv(source=i, tag=11)
        for key in data:
            if key in dictionary:
               dictionary[key] = dictionary[key] + data[key]
            else:
               dictionary[key] = data[key] 
    print(dictionary)
else:
    data = {'a': 1, 'b': 2, 'c': 1}
    comm.send(data, dest=0, tag=11)

在这段代码中,进程rank=0分配了 a dictionary,它是该进程私有的,同样,它data = {'a': 1, 'b': 2, 'c': 1}是其他每个进程私有的。如果(例如)一个进程更改了 variable size,那么其他进程将看不到该更改。

在这段代码中,所有进程都发送他们的字典副本:

    data = {'a': 1, 'b': 2, 'c': 1}
    comm.send(data, dest=0, tag=11)

到进程 0,它调用comm.recv其他每个进程:

for i in range(1, size, 1):
    data = comm.recv(source=i, tag=11)

并将接收到的数据(从其他进程)合并到自己的字典中:

    for key in data:
        if key in dictionary:
           dictionary[key] = dictionary[key] + data[key]
        else:
           dictionary[key] = data[key] 

最后,只有进程 0 拥有完整的dictionary. 当您进行广播时,您也发生了同样的事情。尽管如此,MPI 确实有允许您在所有进程中拥有完整的例程(即)。 comm.Allgatherdictionary

此类代码的示例(您只需要适应字典):

from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendBuffer = numpy.ones(1, dtype=bool)
recvBuffer = numpy.zeros(size, dtype=bool)

print("Before Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allgather([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL])
print("After Allgather  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
MacBook-Pro-de-Bruno:Python dreamcrash$ 

字典最初是空的,并且在所有处理器的并行处理期间得到更新(至少这是我们正在尝试做的)。

使用上述模型(分布式内存范例),每次其中一个进程更改字典时,您都需要显式地与所有进程通信。这意味着您必须事先知道代码中应该进行这些通信的点。

但是,根据您的文本,您似乎需要一种共享内存方法,其中一个进程将更新字典,例如如下:

    if key in dictionary:
       dictionary[key] = dictionary[key] + data[key]
    else:
       dictionary[key] = data[key] 

并且这些更改将立即对所有进程可见。就像多线程代码中发生的事情一样。

MPI 3.0 引入了共享内存的概念,人们实际上可以实现这一点。

下面是一个使用数组的例子:

from mpi4py import MPI 
import numpy as np 

comm = MPI.COMM_WORLD 

size = 1000 
itemsize = MPI.DOUBLE.Get_size() 
if comm.Get_rank() == 0: 
   nbytes = size * itemsize 
else: 
   nbytes = 0 

win = MPI.Win.Allocate_shared(nbytes, itemsize, comm=comm) 

buf, itemsize = win.Shared_query(0) 
assert itemsize == MPI.DOUBLE.Get_size() 
buf = np.array(buf, dtype='B', copy=False) 
ary = np.ndarray(buffer=buf, dtype='d', shape=(size,)) 

if comm.rank == 1: 
  ary[:5] = np.arange(5) 
 
comm.Barrier() 
if comm.rank == 0: 
  print(ary[:10])

代码不是我的,它来自这里


推荐阅读