c - MPI主进程收敛循环
问题描述
我正在尝试编写一个 MPI 程序来模拟整个网格的温度流动以达到平衡。我已经使用 openMP pthreads 和 cuda 编写了串行版本和并行版本。
我的目标是并行化一个为一维数组计算更新温度值的 for 循环。我必须做并行部分的代码在这里(所有其他变量都在上面初始化):
int nproc, rank,chunksize,leftover,offset,source, tag1=3,tag2=2,tag3=1;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nproc);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
chunksize = (boxes / (nproc-1));
leftover = (boxes % (nproc-1));
if(rank == 0){
//init dsv
for(int idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
}
int stop = 0;
int iter = 0;
float max_tmp;
float min_tmp;
while(stop != 1){
offset = 0;
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD);
printf("sent %d temps to process: %d\n",chunk, dest);
offset = offset + chunk;
}
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: %d\n",chunk, dest);
printf("status: %d\n",status.MPI_TAG);
}
max_tmp = -10000;
min_tmp = 10000;
for(idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
if(newtemps[idx] > max_tmp){
max_tmp = newtemps[idx];
}
if(newtemps[idx] < min_tmp){
min_tmp = newtemps[idx];
}
}
stop = (max_tmp - min_tmp) <= (max_tmp * epsilon);
iter += 1;
}
}
if (rank > 0){
int chunk = (rank <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: 0\n",chunk);
printf("status: %d\n",status.MPI_TAG);
for(int j = offset; j < offset+chunk; j++){
float weightedtmp = 0;
int perimeter = 0;
int num_iters = neighbors[j][0];
for(int i = 1; i <= num_iters; i++){
weightedtmp += temps[neighbors[j][i]] * mults[j][i];
perimeter += mults[j][i];
}
weightedtmp /= perimeter;
newtemps[j] = temps[j] + (weightedtmp - temps[j] ) * affect_rate;
}
printf("sent %d temps to process: 0\n",chunk);
MPI_Send(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD);
}
MPI_Finalize();
然而,我的程序成功地通过了 while 循环的第一次迭代并找到了 while 循环的最大值(与我的串行版本匹配),然后将 temps、newtemps 和 offset 变量发送到每个进程。在这里,尽管我的程序停止并且进程从未打印出他们收到了消息。控制台如下所示:
[radeymichael@owens-login04 ~]$ mpicc -o ci changeInput.c
[radeymichael@owens-login04 ~]$ mpirun -np 3 ./ci .1 .1
sent 101 temps to process: 1
sent 100 temps to process: 2
received 101 temps from process: 1
status: 1
received 101 temps from process: 0
status: 1
sent 101 temps to process: 0
received 100 temps from process: 0
status: 1
sent 100 temps to process: 0
received 100 temps from process: 2
status: 1
max: 900.000000
sent 101 temps to process: 1
sent 100 temps to process: 2
我花了很多时间试图找出错误,但认为我缺乏使用 MPI 的基础知识。如果有人可以帮助我找到我的误解在哪里,我将不胜感激。
解决方案
问题是,等级 0 在 while 循环内,并且将发送数据,直到stop=1
所有其他进程将到达该部分MPI_Finalize
的最后一个之后。一种解决方案(如@Gilles 的评论中所见)是添加一个基于所有其他等级的 while 循环,并通过.MPI_Send
else
stop
stop
root
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
请参阅下面的代码。
int nproc, rank,chunksize,leftover,offset,source, tag1=3,tag2=2,tag3=1;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nproc);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
chunksize = (boxes / (nproc-1));
leftover = (boxes % (nproc-1));
int stop = 0;
if(rank == 0){
//init dsv
for(int idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
}
int iter = 0;
float max_tmp;
float min_tmp;
while(stop != 1){
offset = 0;
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD);
printf("sent %d temps to process: %d\n",chunk, dest);
offset = offset + chunk;
}
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: %d\n",chunk, dest);
printf("status: %d\n",status.MPI_TAG);
}
max_tmp = -10000;
min_tmp = 10000;
for(idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
if(newtemps[idx] > max_tmp){
max_tmp = newtemps[idx];
}
if(newtemps[idx] < min_tmp){
min_tmp = newtemps[idx];
}
}
stop = (max_tmp - min_tmp) <= (max_tmp * epsilon);
iter += 1;
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
}
}
if (rank > 0){
while(stop != 1){
int chunk = (rank <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: 0\n",chunk);
printf("status: %d\n",status.MPI_TAG);
for(int j = offset; j < offset+chunk; j++){
float weightedtmp = 0;
int perimeter = 0;
int num_iters = neighbors[j][0];
for(int i = 1; i <= num_iters; i++){
weightedtmp += temps[neighbors[j][i]] * mults[j][i];
perimeter += mults[j][i];
}
weightedtmp /= perimeter;
newtemps[j] = temps[j] + (weightedtmp - temps[j] ) * affect_rate;
}
printf("sent %d temps to process: 0\n",chunk);
MPI_Send(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD);
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
}
}
MPI_Finalize();
推荐阅读
- css - 如何从Angular中的子组件修改父组件的css属性
- mysql - 为什么使用 JSON_ARRAYAGG 进行 SELECT 时忽略 LIMIT?
- c# - 向 C# 编组脚本添加属性以处理 Unity 插件中的音量控制
- python-3.x - df.coalesce(1) 是什么意思?
- python - 我无法使用将变量从一个函数传递到另一个函数
- r - rbind 在一个数据帧中嵌套的数据帧列表
- javascript - 有没有办法监听 Firestore 功能的特定字段?
- can-bus - 如何在带有 ELM237 和 OBDII 的两 ECU 汽车中只与一个 ECU 通话
- sas - SAS引用一个字符串以供标签使用
- reactjs - Braintree-Web ApplePaySession 构造函数未定义