首页 > 解决方案 > 用于向所有 mpi 进程发送和接收的高效设计模型:MPI all 2 all 通信

问题描述

我正在尝试从一个进程向所有 MPI 进程发送消息,并从一个进程中的所有这些进程接收消息。它基本上是一个所有的通信,其中每个进程向每个其他进程(除了它自己)发送消息并从每个其他进程接收消息。

以下示例代码片段显示了我想要实现的目标。现在,MPI_Send 的问题在于它的行为,对于较小的消息大小,它充当非阻塞但对于较大的消息(在我的机器 BUFFER_SIZE 16400 中)它会阻塞。我知道这是 MPI_Send 的行为方式。作为一种解决方法,我将下面的代码替换为阻塞 (send+recv),即 MPI_Sendrecv。示例代码是这样的MPI_Sendrecv(intSendPack, BUFFER_SIZE, MPI_INT, processId, MPI_TAG, intReceivePack, BUFFER_SIZE, MPI_INT, processId, MPI_TAG, MPI_COMM_WORLD, MPI_STATUSES_IGNORE). 我在每个等级的循环内对 MPI_COMM_WORLD 的所有进程进行上述调用,这种方法给了我想要实现的目标(所有通信)。然而,这个电话需要很多时间,我想用一些省时的方法来减少。我已经尝试使用 mpi scatter 和 gather 来执行所有通信,但这里的一个问题是缓冲区大小(16400)在 MPI_all_to_all 函数调用的不同迭代中的实际实现中可能会有所不同。在这里,我使用 MPI_TAG 来区分不同迭代中的调用,我不能在分散和收集函数中使用。

#define BUFFER_SIZE 16400

void MPI_all_to_all(int MPI_TAG)
{

    int size;
    int rank;
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    int* intSendPack = new int[BUFFER_SIZE]();
    int* intReceivePack = new int[BUFFER_SIZE]();

    for (int prId = 0; prId < size; prId++) {
        if (prId != rank) {
            MPI_Send(intSendPack, BUFFER_SIZE, MPI_INT, prId, MPI_TAG,
            MPI_COMM_WORLD);
          }
    }

    for (int sId = 0; sId < size; sId++) {
        if (sId != rank) {
            MPI_Recv(intReceivePack, BUFFER_SIZE, MPI_INT, sId, MPI_TAG,
            MPI_COMM_WORLD, MPI_STATUSES_IGNORE);
        }
    }
}

我想知道是否有一种方法可以使用任何有效的通信模型执行所有通信。我不坚持 MPI_Send,如果有其他方式可以为我提供我想要实现的目标,我对此感到满意。非常感谢任何帮助或建议。

标签: c++mpiblocking

解决方案


这是一个基准,允许在全对全通信中比较集体通信与点对点通信的性能,

#include <iostream>
#include <algorithm>
#include <mpi.h>

#define BUFFER_SIZE 16384

void point2point(int*, int*, int, int);

int main(int argc, char *argv[])
{
    MPI_Init(&argc, &argv);

    int rank_id = 0, com_sz = 0;
    double t0 = 0.0, tf = 0.0;
    MPI_Comm_size(MPI_COMM_WORLD, &com_sz);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank_id);

    int* intSendPack = new int[BUFFER_SIZE]();
    int* result = new int[BUFFER_SIZE*com_sz]();
    std::fill(intSendPack, intSendPack + BUFFER_SIZE, rank_id);
    std::fill(result + BUFFER_SIZE*rank_id, result + BUFFER_SIZE*(rank_id+1), rank_id);

    // Send-Receive
    t0 = MPI_Wtime();
    point2point(intSendPack, result, rank_id, com_sz);
    MPI_Barrier(MPI_COMM_WORLD);
    tf = MPI_Wtime();
    if (!rank_id)
        std::cout << "Send-receive time: " << tf - t0 << std::endl;

    // Collective
    std::fill(result, result + BUFFER_SIZE*com_sz, 0);
    std::fill(result + BUFFER_SIZE*rank_id, result + BUFFER_SIZE*(rank_id+1), rank_id);
    t0 = MPI_Wtime();
    MPI_Allgather(intSendPack, BUFFER_SIZE, MPI_INT, result, BUFFER_SIZE, MPI_INT, MPI_COMM_WORLD);
    MPI_Barrier(MPI_COMM_WORLD);
    tf = MPI_Wtime();
    if (!rank_id)
        std::cout << "Allgather time: " << tf - t0 << std::endl;

    MPI_Finalize();
    delete[] intSendPack;
    delete[] result;
    return 0;
}

// Send/receive communication
void point2point(int* send_buf, int* result, int rank_id, int com_sz)
{
    MPI_Status status;
    // Exchange and store the data
    for (int i=0; i<com_sz; i++){
        if (i != rank_id){
            MPI_Sendrecv(send_buf, BUFFER_SIZE, MPI_INT, i, 0, 
                result + i*BUFFER_SIZE, BUFFER_SIZE, MPI_INT, i, 0, MPI_COMM_WORLD, &status);
        }
    }
}

在这里,每个等级都将自己的数组贡献给所有其他等级上的数组intSendPack,这些数组result在所有等级上都应该最终相同。result是平坦的,每个等级都以.BUFFER_SIZE开头的条目rank_id*BUFFER_SIZE。在点对点通信之后,阵列被重置为其原始形状。

时间是通过设置一个MPI_Barrier可以让你在所有等级中获得最大时间的来衡量的。

I ran the benchmark on 1 node of Nersc Cori KNL using slurm. I ran it a few times each case just to make sure the values are consistent and I'm not looking at an outlier, but you should run it maybe 10 or so times to collect more proper statistics.

Here are some thoughts:

  • For small number of processes (5) and a large buffer size (16384) collective communication is about twice faster than point-to-point, but it becomes about 4-5 times faster when moving to larger number of ranks (64).
  • In this benchmark there is not much difference between performance with recommended slurm settings on that specific machine and default settings but in real, larger programs with more communication there is a very significant one (job that runs for less than a minute with recommended will run for 20-30 min and more with default). Point of this is check your settings, it may make a difference.
  • What you were seeing with Send/Receive for larger messages was actually a deadlock. I saw it too for the message size shown in this benchmark. In case you missed those, there are two worth it SO posts on it: buffering explanation and a word on deadlocking.

In summary, adjust this benchmark to represent your code more closely and run it on your system, but collective communication in an all-to-all or one-to-all situations should be faster because of dedicated optimizations such as superior algorithms used for communication arrangement. A 2-5 times speedup is considerable, since communication often contributes to the overall time the most.


推荐阅读