首页 > 解决方案 > MPI_Reduce w 用户函数和非连续数据

问题描述

我正在尝试做一些简单的 MPI 项目(w MPICH),但在这样做时我遇到了一个我既不理解也无法解决的问题(可能是因为我误解了文档)。所以我基本上想做的是将一个结构传递给 MPI_Reduce 以便对其进行一些操作并将结果返回给根进程。

为此,我尝试了两种不同的方法。第一种方法是使用 MPI_Pack 将结构的元素连续打包到缓冲区中,然后在我的用户函数中解压缩它们。

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>

#define MAX_PACKING_BUF_SIZE 100

typedef struct {
    double x;
    int n;
} BE_Type;

int rank = -1;

void operation(void *invec, void *inoutvec, int *length, MPI_Datatype *type)
{
    uint8_t *buf  = (uint8_t*) invec;
    double *res     = (double*) inoutvec;
    int pos;
    BE_Type value;

    printf("[%d] len: %d\n", rank, *length);

    pos = 0;
    MPI_Unpack(buf, *length, &pos, &value.x, 1, MPI_DOUBLE, MPI_COMM_WORLD);
    MPI_Unpack(buf, *length, &pos, &value.n, 1, MPI_INT,    MPI_COMM_WORLD);

    printf("[%d] x: %lf, n: %d\n", rank, value.x, value.n);

    *res    += value.x;
}

int main (int argc, char *argv[])
{
    int rc, pos;
    int root    = 0;
    double res  = 0.0;
    MPI_Op my_op;
    BE_Type value;
    uint8_t buf[MAX_PACKING_BUF_SIZE];

    rc = MPI_Init(&argc,&argv);
    if (rc != MPI_SUCCESS) printf("ERROR\n");
    rc = MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    if (rc != MPI_SUCCESS) printf("ERROR\n");
    rc = MPI_Op_create( (MPI_User_function*) operation, 1, &my_op);
    if (rc != MPI_SUCCESS) printf("ERROR\n");

    value.x = 1.0;
    value.n = rank;

    pos = 0;
    MPI_Pack(&value.x, 1, MPI_DOUBLE, buf, MAX_PACKING_BUF_SIZE, &pos, MPI_COMM_WORLD);
    MPI_Pack(&value.n, 1, MPI_INT,    buf, MAX_PACKING_BUF_SIZE, &pos, MPI_COMM_WORLD);

    rc = MPI_Reduce(buf, &res, pos, MPI_PACKED, my_op, root, MPI_COMM_WORLD);
    if (rc != MPI_SUCCESS) printf("ERROR\n");

    if (rank == root) {
        printf("res: %lf\n", res);
    }
}

但是,此代码会导致以下结果(启动 4 个进程):

[0] len: 12
[2] len: 12
[2] x: 1.000000, n: 3
[0] x: 1.000000, n: 1
[0] len: 12
[0] x: 2.000000, n: 2
res: 4.000000

所以首先我想知道为什么我的函数只被调用了三次而不是四次?其次(这是我的主要问题):为什么 x 值会在某一时刻发生变化?

有趣的是,使用我测试的第二种方法也改变了 x 值,它定义了一个新的数据类型:

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>

typedef struct {
    double x;
    int n;
} BE_Type;

int rank = -1;

void operation(void *invec, void *inoutvec, int *length, MPI_Datatype *type)
{
    BE_Type *value  = (BE_Type*) invec;
    double *res     = (double*) inoutvec;

    printf("[%d] x: %lf, n: %d\n", rank, value->x, value->n);

    *res    += value->x;
}

int main (int argc, char *argv[])
{
    int rc, pos;
    int root    = 0;
    double res  = 0.0;
    MPI_Op my_op;
    BE_Type value;

    MPI_Datatype MPI_BE_Type;
    int blocklens[2];
    MPI_Aint indices[2];
    MPI_Datatype old_types[2];

    rc = MPI_Init(&argc,&argv);
    if (rc != MPI_SUCCESS) printf("ERROR\n");
    rc = MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    if (rc != MPI_SUCCESS) printf("ERROR\n");
    rc = MPI_Op_create( (MPI_User_function*) operation, 1, &my_op);
    if (rc != MPI_SUCCESS) printf("ERROR\n");

    blocklens[0] = 1;
    blocklens[1] = 1;
    old_types[0] = MPI_DOUBLE;
    old_types[1] = MPI_INT;
    MPI_Address(&value.x, &indices[0]);
    MPI_Address(&value.n, &indices[1]);
    indices[1] = indices[1] - indices[0];
    indices[0] = 0;
    MPI_Type_struct(2, blocklens, indices, old_types, &MPI_BE_Type);
    MPI_Type_commit(&MPI_BE_Type);

    value.x = 1.0;
    value.n = rank;

    rc = MPI_Reduce(&value, &res, 1, MPI_BE_Type, my_op, root, MPI_COMM_WORLD);
    if (rc != MPI_SUCCESS) printf("ERROR\n");

    if (rank == root) {
        printf("res: %lf\n", res);
    }
}

结果(已启动 4 个进程):

[2] x: 1.000000, n: 3
[0] x: 1.000000, n: 1
[0] x: 2.000000, n: 2
res: 4.000000

所以我想我只是误解了某些东西或错误地使用了它。每一个帮助都是值得赞赏的。谢谢!

标签: cmpimpich

解决方案


所以首先我想知道为什么我的函数只被调用了三次而不是四次?

将 N 个值相加,需要 N-1 个加法。这同样适用于任何操作。无论您如何重新排列它们。

其次(这是我的主要问题):为什么 x 值会在某一时刻发生变化?

该操作作为树(通常是二叉树)执行。在您的情况下,它看起来像这样:

Ranks   0 1 2 3

        1 1 1 1
        |/  | /
        +   +
        2   2
        |  /
        | /
        +
        4

操作必须始终是关联的,因此这是计算结果的有效方法。

我不会真正推荐在自定义缩减操作中打包/解包的第一种方式。无论如何:您的归约函数必须同时 in使用, 和inoutas BE_Type!该功能还必须适用于任何length. 所以它可能看起来像:

void operation(void *invec, void *inoutvec, int *length, MPI_Datatype *type)
{
    BE_Type *value = (BE_Type*) invec;
    BE_Type *res   = (BE_Type*) inoutvec;

    for (int i = 0; i < *length; i++) {
        res[i].x += value[i].x;
        // should probably do something with n
    }
}

推荐阅读