首页 > 解决方案 > parallel computing with foreach Rcpp and RNG much slower than expected

问题描述

I have an Rcpp-function that outputs a large matrix which I want to save as an R object. My idea was to speed things up using my Rcpp function in parallel with the package foreach.

For the same matrix size, using foreach takes about more than five times as long on my Windows machine as just running the function without foreach (exclduing the set up of the workers). I am aware of the issues surrounding the execution of very small tasks in parallel (e.g. Why is the parallel package slower than just using apply?). Also I am willing to leave aside theoretical problems with running Random Number Generators in parallel as the results might no longer be truly random.

As my subtasks should be large enough, apparently the Rcpp-function I wrote does not work well in parallel, but I do not know why. Is using an RNG in the Rcpp-function simply a task that cannot be parallelized? Apart from that: is there an optimal i and with that an optimal ncol (here n_bootstrap)of my submatrices in foreach? Any help is much aprreciated. Also please feel free to comment on the code in general if you like.

Clarification: I compiled a package and use mypackage::funC within foreach

Here is an example code in R:

y <- funC(n_bootstrap = 250, n_obs_censusdata = 300000,
          locationeffects = as.numeric(1:200), 
          residuals = as.numeric(1:20000),
          X = matrix(as.numeric(1:3000000), ncol = 10), 
          beta_sample = matrix(as.numeric(1:2500), ncol = 250))

in parallel:

no_cores <- parallel::detectCores() - 2
cl <- parallel::makeCluster(no_cores)
doParallel::registerDoParallel(cl)

y <- foreach(i=1:5, .combine = "cbind") %dopar% {

  funC(n_bootstrap = 50,
       n_obs_censusdata = 300000, locationeffects = as.numeric(1:200), 
       residuals = as.numeric(1:20000), 
       X = matrix(as.numeric(1:3000000), ncol = 10), 
       beta_sample = matrix(as.numeric(1:2500), ncol = 250))
                       }
parallel::stopCluster(cl)

added: with bigstatsr

y <- bigstatsr::FBM(nrow = 300000, ncol = 250, type = "double")
bigstatsr::big_apply(y, a.FUN = function(y, ind, fun) {
          y[, ind] <- fun(n_bootstrap = length(ind),
                                    n_obs_censusdata = 300000,
                                    locationeffects = as.numeric(1:200),
                                    residuals = as.numeric(1:20000),
                                    X = matrix(as.numeric(1:3000000), ncol = 10), 
                                    beta_sample =  matrix(as.numeric(1:2500), ncol = 250))
          NULL
        }, a.combine = 'c', ncores = bigstatsr::nb_cores(), fun = funC)+

here is the Rcpp-code:

// -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-

#include <RcppEigen.h>
#include <random>

using namespace Rcpp;
// [[Rcpp::depends(RcppEigen)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::export]]
SEXP funC(const int n_bootstrap,
          const int n_obs_censusdata, 
          const Eigen::Map<Eigen::VectorXd> locationeffects, 
          const Eigen::Map<Eigen::VectorXd> residuals,
          const Eigen::Map<Eigen::MatrixXd> X, 
          const Eigen::Map<Eigen::MatrixXd> beta_sample)
{

  // --------- create random sample of locations and of residuals --------- //

    // initialise random seeds 
  std::random_device rd; // used to obtain a seed for the number engine
  std::mt19937 gen(rd()); // Mersenne Twister engine 

  // initialize distributions for randam locations and residuals
  const int upperlocation = locationeffects.size();
  const int upperresiduals = residuals.size();

  std::uniform_int_distribution<> distrloc(1, upperlocation);
  std::uniform_int_distribution<> distrres(1, upperresiduals);

  // initialize and fill matrix for randam locations and residuals 
  Eigen::MatrixXd LocationEffectResiduals(n_obs_censusdata, n_bootstrap);

  for (int i=0; i<n_obs_censusdata; ++i)
    for (int j=0; j<n_bootstrap; j++)
      LocationEffectResiduals(i,j) = locationeffects[distrloc(gen)-1] + residuals[distrres(gen)-1]; // subtract 1 because in C++ indices start with 0

  // ----- create Xbeta ------- //
    Eigen::MatrixXd Xbeta = X * beta_sample;

  // ----- combine results ------- //
    Eigen::MatrixXd returnmatrix = Xbeta + LocationEffectResiduals;

  return Rcpp::wrap(returnmatrix);
}

标签: rforeachrcpp

解决方案


在这里,您要创建一个大型矩阵。原则上可以将其分配给多个进程,但最终要承担合并结果的成本。我建议在这里使用“共享内存并行”。我使用此处的 OpenMP 代码作为算法并行版本的起点:

// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::depends(dqrng)]]
#include <xoshiro.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>
// [[Rcpp::plugins(cpp11)]]
#include <random>

// [[Rcpp::export]]
Eigen::MatrixXd funD(const int n_bootstrap,
                     const int n_obs_censusdata, 
                     const Eigen::Map<Eigen::VectorXd> locationeffects, 
                     const Eigen::Map<Eigen::VectorXd> residuals,
                     const Eigen::Map<Eigen::MatrixXd> X, 
                     const Eigen::Map<Eigen::MatrixXd> beta_sample,
                     int ncores) {

  // --------- create random sample of locations and of residuals --------- //

  // initialise random seeds 
  std::random_device rd; // used to obtain a seed for the number engine
  dqrng::xoshiro256plus gen(rd());

  // initialize distributions for randam locations and residuals
  const int upperlocation = locationeffects.size();
  const int upperresiduals = residuals.size();

   // subtract 1 because in C++ indices start with 0
  std::uniform_int_distribution<> distrloc(0, upperlocation - 1);
  std::uniform_int_distribution<> distrres(0, upperresiduals - 1);

  // initialize and fill matrix for randam locations and residuals 
  Eigen::MatrixXd LocationEffectResiduals(n_obs_censusdata, n_bootstrap);

  #pragma omp parallel num_threads(ncores)
  {
    dqrng::xoshiro256plus lgen(gen);      // make thread local copy of rng 
    lgen.jump(omp_get_thread_num() + 1);  // advance rng by 1 ... ncores jumps 

    #pragma omp for
    for (int i=0; i<n_obs_censusdata; ++i)
      for (int j=0; j<n_bootstrap; j++)
        LocationEffectResiduals(i,j) = locationeffects[distrloc(lgen)] + residuals[distrres(lgen)];
  }  

  // ----- create Xbeta ------- //
  Eigen::MatrixXd Xbeta = X * beta_sample;

  // ----- combine results ------- //
  Eigen::MatrixXd returnmatrix = Xbeta + LocationEffectResiduals;

  return returnmatrix;
}

在我的双核 Linux 系统上,我funD的 withncores = 1比你的稍快funC,可能是因为使用的 RNG 更快。有了ncores = 2它,它又增加了 30-40%。不错,因为并非所有代码都是并行执行的。我不知道这些天在 Windows 上的 OpenMP 性能有多好。改用它可能有意义RcppParallel。但这需要对您的代码进行更多更改。

上述代码的来源是Rcpp::sourceCpp(). 当你把它放入一个包中时,你应该使用

CXX_STD = CXX11
PKG_CXXFLAGS = $(SHLIB_OPENMP_CXXFLAGS)
PKG_LIBS = $(SHLIB_OPENMP_CXXFLAGS)

Makevars(.win). 请注意,根据WRE,如果 C++11 与 C++98 使用不同的编译器,这可能不会像预期的那样工作。IIRC Solaris 是唯一在默认配置中出现这种情况的平台。所以对于内部包你应该没问题。


推荐阅读