首页 > 解决方案 > 将普通 python 代码转换为 MPI 代码

问题描述

我有这个代码,我想编辑它并将其作为 MPI 代码运行。代码中的数组mass_array1是一个多维数组,总“迭代次数”i*j约为 8000 万次。我的意思是,如果我将数组展平为一维数组,则有 8000 万个元素。

该代码需要将近 2 天的时间才能运行,这很烦人,因为它只是整个项目的一小部分。由于我可以登录到集群并通过 20 个左右的处理器(甚至更多)运行代码,有人可以帮我将此代码编辑为 MPI 代码吗?

即使用 C 语言编写 MPI 代码也可以。

#Alotting Black Holes at z=6
from tqdm import tqdm
bhs=[0]*1000

for i in tqdm(range(0,1000),leave=True):
    bhs[i]=np.zeros(len(mass_array1[i]))
    for j in range (len(mass_array1[i])):
        bhs[i][j]=np.random.lognormal(np.log(Mbhthfit6(mass_array1[i],6)[j]),np.log(5))

当前在该集群上使用 MPI 的 C 程序:

int main(int argc,char **argv){
float epsran;
FILE *fp;
char str[256];
fp=fopen("parameterfile.dat","w");
fprintf(fp,
   " cosmological parameter\n"
       "h:%f\n"
   "omegam:%f\n"
   "omegab:%f\n"
   "omegal:%f\n"
   "sigma8:%f\n"
   "rho0mMpc:%e\n"
   "alpha:%f\n"
   "deltac:%f\n",ndh,
   omegam,omegab,omegal,sigma8,rho0mMpc,alpha,deltac);
fclose(fp);
/* MPI test */
int i,Petot,MyRank;
clock_t start,end;
start = clock();
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &Petot);
MPI_Comm_rank(MPI_COMM_WORLD, &MyRank);
srand((unsigned)(time(NULL)+MyRank));
//printf ("Hello World %d\n%d", MyRank,Petot);
float samples[100];
for(i=0;i<100/Petot;i++){
  samples[i]=halo_samples(1.68,1000);
    outputS(235,30,varipsapp(samples[i],0),MyRank*(100/Petot)+i);
}
printf("Length:%d",(int)(sizeof(samples)/sizeof(samples[0])));
/*    FILE *fpw;
fpw = fopen("Minitial.dat","w");
for(i=0;i<MyRank*(100/Petot);i++){
  fprintf(fpw,"%f\n",samples[i]);
  }
  fclose(fpw);*/
MPI_Finalize();
end = clock();
  }

提交工作

在此之后,有一个如下所示的 job.sh 文件:

#!/bin/sh     
#$ -S /bin/sh                                                                  
#$ -cwd                                          
#$ -V
#$ -N mergertree
#$ -q all.q@messier04
#$ -q all.q@messier05
#$ -pe openmpi10 20 
#$ -o resultfile/out.txt
#$ -e resultfile/error.txt
                                                       
mpirun -np $NSLOTS ./a.out

Mbhfit6

这就是我在代码中定义 Mbhfit6 的方式:

def Mbhthfit6(Mdm,z):
    a= 5.00041824
    b= 0.31992748
    Mbhth=(10**a)*(Mdm**b)
    return Mbhth

mass_array1

在这里,我上传了一个包含 mass_array1 数据的文件(zip 格式)。 https://drive.google.com/file/d/1C-G28OSND7jxqkFZQS3dlW6_40yBN6Fy/view?usp=sharing

您需要将文件解压缩到一个文件夹中,然后使用下面的代码在 Python 中将其导入

这是我导入文件的代码:(只有 3 MB)

#import all the files from directory
dirlist=["bh2e8"]
import time

mass_array1=[0]*1000
#print(mass_array)
#read all the files 
for i,X in enumerate(dirlist):
    exec('filelist=glob.glob("%s/test*.dat")'%(X))
    #exec("mass_array%s=[]"%X)
    initial_mass=[]
    for j,Y in tqdm(enumerate(filelist),position=0, leave=True, total=1000):
        Y=Y.replace(os.sep, '/')
        #Z=int(Y[10:13])
        Z=int(re.findall("\d+", Y)[2])
        #print(Z)
        mass_array1[Z]=[]
        #print('i=',Z,end="\r")
        #print('i=',Z,end="\r")
        exec("initial_partial=np.loadtxt('%s',max_rows=1)"%(Y))
        exec("initial_mass=np.append(initial_mass,initial_partial)")
        exec("mass_partial=np.loadtxt('%s',skiprows=1)"%(Y))
        mass_array1[Z]=np.append(mass_partial,mass_array1[Z])
        #mass_array1[Z]=mass_partial

标签: pythoncperformancemultiprocessingmpi

解决方案


如果您采用有效的方法来处理数据,我不认为这是需要 mpi 的足够大的数据集。

正如我在评论中提到的,我发现处理大量数值数据的最佳方法是首先使用 numpy 矢量化,然后尝试使用 numba jit 编译,最后使用多核处理。一般来说,这是从最容易到最难的顺序,也可以让你以最少的工作获得最快的速度。在您的情况下,我认为矢量化确实是要走的路,当我这样做的时候,我做了一些不是真正必要的重新组织,但帮助我跟踪数据。

import numpy as np
from pathlib import Path
import re

dirlist=[r"C:\Users\aaron\Downloads\bh2e8"]
dirlist = [Path(d) for d in dirlist] #convert directory paths to pathlib.Path objects for ease of file system manipulation

initial_mass = {} #use a dictionary so we don't have to preallocate indices
mass_array = {} #use a dictionary so we don't have to preallocate indices

for dir_path in dirlist:
    for child in dir_path.iterdir():
        m = re.match(".*?test(?P<index>\d+)\.dat$", str(child))
        if m: #if we match the end of the child path as a testxxx.dat file (not another directory or some other file type)
            file_index = int(m["index"])
            with child.open() as f:
                arr = [float(line) for line in f if line.strip()] #1d array of float numbers skipping any empty lines
            initial_mass[file_index] = arr[0]
            mass_array[file_index] = np.array(arr[1:])

我开始以稍微不同的方式读取数据,因为我发现创建数组字典更自然,因此创建它们的顺序无关紧要。文件的索引(文件名末尾的数字)用作字典的键,因此如果您想要使用以下内容很容易将其转换回列表:mass_array = list(mass_array[i] for i in range(1000))

j然后查看其余代码,您使用的所有numpy函数一次处理整个数据数组的速度比使用内部循环(重写正文以使用矢量化:


#Alotting Black Holes at z=6

bhs={} #use a dictionary to avoid the need for preallocation

for i, arr in mass_array.items(): #items in python3 iteritems in python2
    
    #inline Mbhthfit6 function, and calculate using vectorization (compute an entire array at once per iteration of `i`)
    bhs[i] = np.random.lognormal(
                                np.log((10**5.00041824)*(arr**0.31992748)),
                                np.log(5)
                                )

再次,如果您想将bhs字典转换回以前的列表,这很简单:bhs = list(bhs[i] for i in range(1000))

通过这些更改(以及相对强大的 PC),代码在您提供的数据文件上执行不到半秒。在示例数据集中只有超过 700,000 个值,如果我们推断出 8000 万个,那应该是一两分钟的量级。

PS如果您发现自己使用exec了很多生成的代码字符串,您几乎总是会发现有更好的方法来完成相同的事情,通常只需稍微不同的数据结构。


推荐阅读