首页 > 解决方案 > 为可变复杂性任务或可变速度节点负载平衡 MPI 多线程?

问题描述

我编写了一个 MPI 代码,该代码当前通过将相等数量的元素从每个数组发送到不同的进程来完成工作(因此,对于 6 个工作人员,数组被分成 6 个相等的部分)来实现多线程。我想做的是仅在工作人员准备好接收时发送小块,并在不阻塞未来发送的情况下接收完成的块;这样,如果一个块需要 10 秒而其他块需要 1 秒,则可以在等待长块完成时处理其他数据。

这是我整理的一些骨架代码:

#include <mpi.h>
#include <iostream>
#include <vector>
#include <cmath>

struct crazytaxi
{
    double a = 10.0;
    double b = 25.2;
    double c = 222.222;
};

int main(int argc, char** argv)
{
    //Initial and temp kanno vectors
    std::vector<crazytaxi> kanno;
    std::vector<crazytaxi> kanno_tmp;

    //init MPI
    MPI_Init(NULL,NULL);

    //allocate vector
    int SZ = 4200;
    kanno.resize(SZ);

    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD,&world_size);

    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);

    if (world_rank == 0)
    {
        for (int i = 0; i < SZ; i++)
            kanno[i].a = 1.0*i;
            kanno[i].b = 10.0/(i+1);
    }

    for (int j = 0; j < 10; j++) {

        //Make sure all processes have same kanno vector;
        if (world_rank == 0) {
            for (int i = 1; i < world_size; i++)
                MPI_Send(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,i,3,MPI_COMM_WORLD);
        } else {
            MPI_Recv(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,0,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        }

        //copy to tmp vector
        kanno_tmp = kanno;
        MPI_Barrier();

        //the sender
        if (world_rank == 0) {
            unsigned p1 = 0;
            unsigned segment = 10;
            unsigned p2 = segment;
            while (p1 < SZ) {
                for (int i = 0; i < world_size; i++) {
                    //if (process #i is ready to receive)
                        //Send data in chunks of 10 to i
                    //else
                        //continue
                }
            }
        }
        if (world_rank != 0) {
            //Receive data to be processed
            //do some math
            for (unsigned i = p1; i < p2; i++)
                kanno_tmp[i].a = std::sqrt(kanno[i].a)/((double)i+1.0);

            //Send processed data to 0 and wait to receive new data.
        }

        //copy temp vector to kanno
        kanno = kanno_tmp;
    }

    //print some of the results;
    if (world_rank == 0)
    {
        for (int i = 0; i < SZ; i += 40)
            printf("Line %d: %lg,%lg\n",i,kanno[i].a,kanno[i].b);
    }

    MPI_Finalize();
}

我可以 90% 把它变成我想要的,除了我的 MPI_Send 和 MPI_Recv 调用会阻塞,或者“主”进程不会知道“从”进程已准备好接收数据。

MPI中有没有办法做类似的事情

unsigned Datapointer = [some_array_index];
while (Datapointer < array_size) {
    if (world_rank == 0) {
        for (int i = 1; i < world_size; i++)
        {
            if (<process i is ready to receive>) {
                MPI_Send([...]);
                Datapointer += 10;
            }
            if (<process i has sent data>)
                MPI_Recv([...]);
            if (Datapointer > array_size) {
                MPI_Bcast([killswitch]);
                break;
            }
        }
    }
}
MPI_Barrier();

或者有没有更有效的方法来为可变复杂度块或变速节点构建它?

标签: performancempi

解决方案


正如@Gilles Gouaillardet 所指出的那样,这种情况下的关键字是MPI_ANY_SOURCE. 使用它,进程可以从任何来源接收消息。要知道哪个进程发送该消息,您可以使用呼叫status.MPI_SOURCE状态。recv

MPI_Status status;
if(rank == 0) {
  //send initial work to all processes
  while(true) {
    MPI_recv(buf, 32, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    // do the distribution logic
    MPI_send(buf, 32, MPI_INT, status.MPI_SOURCE, tag, MPI_COMM_WORLD);
    // break out of the loop once the work is over and send all the processes 
    message to stop waiting for work
  }
}
else {
  while(true){
    // receive work from rank 0
    MPI_recv(buf, 32, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    // Perform computation and send back the result
    MPI_send(buf, 32, MPI_INT, 0, tag, MPI_COMM_WORLD);
    //break this until asked by master 0 using some kind of special message
  }
}

推荐阅读