performance - 为可变复杂性任务或可变速度节点负载平衡 MPI 多线程?
问题描述
我编写了一个 MPI 代码,该代码当前通过将相等数量的元素从每个数组发送到不同的进程来完成工作(因此,对于 6 个工作人员,数组被分成 6 个相等的部分)来实现多线程。我想做的是仅在工作人员准备好接收时发送小块,并在不阻塞未来发送的情况下接收完成的块;这样,如果一个块需要 10 秒而其他块需要 1 秒,则可以在等待长块完成时处理其他数据。
这是我整理的一些骨架代码:
#include <mpi.h>
#include <iostream>
#include <vector>
#include <cmath>
struct crazytaxi
{
double a = 10.0;
double b = 25.2;
double c = 222.222;
};
int main(int argc, char** argv)
{
//Initial and temp kanno vectors
std::vector<crazytaxi> kanno;
std::vector<crazytaxi> kanno_tmp;
//init MPI
MPI_Init(NULL,NULL);
//allocate vector
int SZ = 4200;
kanno.resize(SZ);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD,&world_size);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);
if (world_rank == 0)
{
for (int i = 0; i < SZ; i++)
kanno[i].a = 1.0*i;
kanno[i].b = 10.0/(i+1);
}
for (int j = 0; j < 10; j++) {
//Make sure all processes have same kanno vector;
if (world_rank == 0) {
for (int i = 1; i < world_size; i++)
MPI_Send(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,i,3,MPI_COMM_WORLD);
} else {
MPI_Recv(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,0,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
}
//copy to tmp vector
kanno_tmp = kanno;
MPI_Barrier();
//the sender
if (world_rank == 0) {
unsigned p1 = 0;
unsigned segment = 10;
unsigned p2 = segment;
while (p1 < SZ) {
for (int i = 0; i < world_size; i++) {
//if (process #i is ready to receive)
//Send data in chunks of 10 to i
//else
//continue
}
}
}
if (world_rank != 0) {
//Receive data to be processed
//do some math
for (unsigned i = p1; i < p2; i++)
kanno_tmp[i].a = std::sqrt(kanno[i].a)/((double)i+1.0);
//Send processed data to 0 and wait to receive new data.
}
//copy temp vector to kanno
kanno = kanno_tmp;
}
//print some of the results;
if (world_rank == 0)
{
for (int i = 0; i < SZ; i += 40)
printf("Line %d: %lg,%lg\n",i,kanno[i].a,kanno[i].b);
}
MPI_Finalize();
}
我可以 90% 把它变成我想要的,除了我的 MPI_Send 和 MPI_Recv 调用会阻塞,或者“主”进程不会知道“从”进程已准备好接收数据。
MPI中有没有办法做类似的事情
unsigned Datapointer = [some_array_index];
while (Datapointer < array_size) {
if (world_rank == 0) {
for (int i = 1; i < world_size; i++)
{
if (<process i is ready to receive>) {
MPI_Send([...]);
Datapointer += 10;
}
if (<process i has sent data>)
MPI_Recv([...]);
if (Datapointer > array_size) {
MPI_Bcast([killswitch]);
break;
}
}
}
}
MPI_Barrier();
或者有没有更有效的方法来为可变复杂度块或变速节点构建它?
解决方案
正如@Gilles Gouaillardet 所指出的那样,这种情况下的关键字是MPI_ANY_SOURCE
. 使用它,进程可以从任何来源接收消息。要知道哪个进程发送该消息,您可以使用呼叫status.MPI_SOURCE
状态。recv
MPI_Status status;
if(rank == 0) {
//send initial work to all processes
while(true) {
MPI_recv(buf, 32, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// do the distribution logic
MPI_send(buf, 32, MPI_INT, status.MPI_SOURCE, tag, MPI_COMM_WORLD);
// break out of the loop once the work is over and send all the processes
message to stop waiting for work
}
}
else {
while(true){
// receive work from rank 0
MPI_recv(buf, 32, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// Perform computation and send back the result
MPI_send(buf, 32, MPI_INT, 0, tag, MPI_COMM_WORLD);
//break this until asked by master 0 using some kind of special message
}
}
推荐阅读
- apache-flink - 如何根据事件处理已经可用的状态在 flink 中来自不同的流
- python - 属性的 Python 数据类后初始化
- javascript - 将 python pyDes 转换为 CryptoJS
- sql - 在过滤器的where子句中匹配多行
- eclipse - 无法使用 Eclipse IDE 在 chrome 上启动网页
- java - 在 Java 中将形状文件转换为 RDF 文档
- laravel - 具有多个属性的 Laravel 请求验证自定义函数
- java - Android - 如何使用手机拼写检查/单词预测并将我的自定义完成添加到我的自定义键盘?
- python - 遍历 4d 和 3d 数组并再次返回 4d 形状的值
- c# - 如何使用 itextsharp 在 C# 中更改格式?