python - 将普通 python 代码转换为 MPI 代码
问题描述
我有这个代码,我想编辑它并将其作为 MPI 代码运行。代码中的数组mass_array1
是一个多维数组,总“迭代次数”i*j
约为 8000 万次。我的意思是,如果我将数组展平为一维数组,则有 8000 万个元素。
该代码需要将近 2 天的时间才能运行,这很烦人,因为它只是整个项目的一小部分。由于我可以登录到集群并通过 20 个左右的处理器(甚至更多)运行代码,有人可以帮我将此代码编辑为 MPI 代码吗?
即使用 C 语言编写 MPI 代码也可以。
#Alotting Black Holes at z=6
from tqdm import tqdm
bhs=[0]*1000
for i in tqdm(range(0,1000),leave=True):
bhs[i]=np.zeros(len(mass_array1[i]))
for j in range (len(mass_array1[i])):
bhs[i][j]=np.random.lognormal(np.log(Mbhthfit6(mass_array1[i],6)[j]),np.log(5))
当前在该集群上使用 MPI 的 C 程序:
int main(int argc,char **argv){
float epsran;
FILE *fp;
char str[256];
fp=fopen("parameterfile.dat","w");
fprintf(fp,
" cosmological parameter\n"
"h:%f\n"
"omegam:%f\n"
"omegab:%f\n"
"omegal:%f\n"
"sigma8:%f\n"
"rho0mMpc:%e\n"
"alpha:%f\n"
"deltac:%f\n",ndh,
omegam,omegab,omegal,sigma8,rho0mMpc,alpha,deltac);
fclose(fp);
/* MPI test */
int i,Petot,MyRank;
clock_t start,end;
start = clock();
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &Petot);
MPI_Comm_rank(MPI_COMM_WORLD, &MyRank);
srand((unsigned)(time(NULL)+MyRank));
//printf ("Hello World %d\n%d", MyRank,Petot);
float samples[100];
for(i=0;i<100/Petot;i++){
samples[i]=halo_samples(1.68,1000);
outputS(235,30,varipsapp(samples[i],0),MyRank*(100/Petot)+i);
}
printf("Length:%d",(int)(sizeof(samples)/sizeof(samples[0])));
/* FILE *fpw;
fpw = fopen("Minitial.dat","w");
for(i=0;i<MyRank*(100/Petot);i++){
fprintf(fpw,"%f\n",samples[i]);
}
fclose(fpw);*/
MPI_Finalize();
end = clock();
}
提交工作
在此之后,有一个如下所示的 job.sh 文件:
#!/bin/sh
#$ -S /bin/sh
#$ -cwd
#$ -V
#$ -N mergertree
#$ -q all.q@messier04
#$ -q all.q@messier05
#$ -pe openmpi10 20
#$ -o resultfile/out.txt
#$ -e resultfile/error.txt
mpirun -np $NSLOTS ./a.out
Mbhfit6
这就是我在代码中定义 Mbhfit6 的方式:
def Mbhthfit6(Mdm,z):
a= 5.00041824
b= 0.31992748
Mbhth=(10**a)*(Mdm**b)
return Mbhth
mass_array1
在这里,我上传了一个包含 mass_array1 数据的文件(zip 格式)。 https://drive.google.com/file/d/1C-G28OSND7jxqkFZQS3dlW6_40yBN6Fy/view?usp=sharing
您需要将文件解压缩到一个文件夹中,然后使用下面的代码在 Python 中将其导入
这是我导入文件的代码:(只有 3 MB)
#import all the files from directory
dirlist=["bh2e8"]
import time
mass_array1=[0]*1000
#print(mass_array)
#read all the files
for i,X in enumerate(dirlist):
exec('filelist=glob.glob("%s/test*.dat")'%(X))
#exec("mass_array%s=[]"%X)
initial_mass=[]
for j,Y in tqdm(enumerate(filelist),position=0, leave=True, total=1000):
Y=Y.replace(os.sep, '/')
#Z=int(Y[10:13])
Z=int(re.findall("\d+", Y)[2])
#print(Z)
mass_array1[Z]=[]
#print('i=',Z,end="\r")
#print('i=',Z,end="\r")
exec("initial_partial=np.loadtxt('%s',max_rows=1)"%(Y))
exec("initial_mass=np.append(initial_mass,initial_partial)")
exec("mass_partial=np.loadtxt('%s',skiprows=1)"%(Y))
mass_array1[Z]=np.append(mass_partial,mass_array1[Z])
#mass_array1[Z]=mass_partial
解决方案
如果您采用有效的方法来处理数据,我不认为这是需要 mpi 的足够大的数据集。
正如我在评论中提到的,我发现处理大量数值数据的最佳方法是首先使用 numpy 矢量化,然后尝试使用 numba jit 编译,最后使用多核处理。一般来说,这是从最容易到最难的顺序,也可以让你以最少的工作获得最快的速度。在您的情况下,我认为矢量化确实是要走的路,当我这样做的时候,我做了一些不是真正必要的重新组织,但帮助我跟踪数据。
import numpy as np
from pathlib import Path
import re
dirlist=[r"C:\Users\aaron\Downloads\bh2e8"]
dirlist = [Path(d) for d in dirlist] #convert directory paths to pathlib.Path objects for ease of file system manipulation
initial_mass = {} #use a dictionary so we don't have to preallocate indices
mass_array = {} #use a dictionary so we don't have to preallocate indices
for dir_path in dirlist:
for child in dir_path.iterdir():
m = re.match(".*?test(?P<index>\d+)\.dat$", str(child))
if m: #if we match the end of the child path as a testxxx.dat file (not another directory or some other file type)
file_index = int(m["index"])
with child.open() as f:
arr = [float(line) for line in f if line.strip()] #1d array of float numbers skipping any empty lines
initial_mass[file_index] = arr[0]
mass_array[file_index] = np.array(arr[1:])
我开始以稍微不同的方式读取数据,因为我发现创建数组字典更自然,因此创建它们的顺序无关紧要。文件的索引(文件名末尾的数字)用作字典的键,因此如果您想要使用以下内容很容易将其转换回列表:mass_array = list(mass_array[i] for i in range(1000))
j
然后查看其余代码,您使用的所有numpy函数一次处理整个数据数组的速度比使用内部循环(重写正文以使用矢量化:
#Alotting Black Holes at z=6
bhs={} #use a dictionary to avoid the need for preallocation
for i, arr in mass_array.items(): #items in python3 iteritems in python2
#inline Mbhthfit6 function, and calculate using vectorization (compute an entire array at once per iteration of `i`)
bhs[i] = np.random.lognormal(
np.log((10**5.00041824)*(arr**0.31992748)),
np.log(5)
)
再次,如果您想将bhs
字典转换回以前的列表,这很简单:bhs = list(bhs[i] for i in range(1000))
通过这些更改(以及相对强大的 PC),代码在您提供的数据文件上执行不到半秒。在示例数据集中只有超过 700,000 个值,如果我们推断出 8000 万个,那应该是一两分钟的量级。
PS如果您发现自己使用exec
了很多生成的代码字符串,您几乎总是会发现有更好的方法来完成相同的事情,通常只需稍微不同的数据结构。
推荐阅读
- android - 转换错误 Mongodb Nodejs
- sql - 创建月度报告
- bash - 了解 sed 命令
- javascript - React onDrag 对于 screenX/screenY 总是以 0 结尾
- python - 将 python 字典转换为 MATLAB
- php - 为什么 eloquent 中的胶囊在 slim 中需要 setAsglobal() 方法
- c# - 在 C# 中创建泛型函数以将数组转换为字典
- java - Spring Boot 请求:“在启用 'debug' 的情况下重新运行您的应用程序” - 我该怎么做?
- angular - Angular:从静态变量访问@ngx-translate/core
- ios - 如何从 Model > NSDictionary 获取值并解析到另一个视图控制器?