r - parallel computing with foreach Rcpp and RNG much slower than expected
问题描述
I have an Rcpp-function that outputs a large matrix which I want to save as an R object. My idea was to speed things up using my Rcpp function in parallel with the package foreach.
For the same matrix size, using foreach takes about more than five times as long on my Windows machine as just running the function without foreach (exclduing the set up of the workers). I am aware of the issues surrounding the execution of very small tasks in parallel (e.g. Why is the parallel package slower than just using apply?). Also I am willing to leave aside theoretical problems with running Random Number Generators in parallel as the results might no longer be truly random.
As my subtasks should be large enough, apparently the Rcpp-function I wrote does not work well in parallel, but I do not know why. Is using an RNG in the Rcpp-function simply a task that cannot be parallelized? Apart from that: is there an optimal i and with that an optimal ncol (here n_bootstrap)of my submatrices in foreach? Any help is much aprreciated. Also please feel free to comment on the code in general if you like.
Clarification: I compiled a package and use mypackage::funC within foreach
Here is an example code in R:
y <- funC(n_bootstrap = 250, n_obs_censusdata = 300000,
locationeffects = as.numeric(1:200),
residuals = as.numeric(1:20000),
X = matrix(as.numeric(1:3000000), ncol = 10),
beta_sample = matrix(as.numeric(1:2500), ncol = 250))
in parallel:
no_cores <- parallel::detectCores() - 2
cl <- parallel::makeCluster(no_cores)
doParallel::registerDoParallel(cl)
y <- foreach(i=1:5, .combine = "cbind") %dopar% {
funC(n_bootstrap = 50,
n_obs_censusdata = 300000, locationeffects = as.numeric(1:200),
residuals = as.numeric(1:20000),
X = matrix(as.numeric(1:3000000), ncol = 10),
beta_sample = matrix(as.numeric(1:2500), ncol = 250))
}
parallel::stopCluster(cl)
added: with bigstatsr
y <- bigstatsr::FBM(nrow = 300000, ncol = 250, type = "double")
bigstatsr::big_apply(y, a.FUN = function(y, ind, fun) {
y[, ind] <- fun(n_bootstrap = length(ind),
n_obs_censusdata = 300000,
locationeffects = as.numeric(1:200),
residuals = as.numeric(1:20000),
X = matrix(as.numeric(1:3000000), ncol = 10),
beta_sample = matrix(as.numeric(1:2500), ncol = 250))
NULL
}, a.combine = 'c', ncores = bigstatsr::nb_cores(), fun = funC)+
here is the Rcpp-code:
// -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; indent-tabs-mode: nil; -*-
#include <RcppEigen.h>
#include <random>
using namespace Rcpp;
// [[Rcpp::depends(RcppEigen)]]
// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::export]]
SEXP funC(const int n_bootstrap,
const int n_obs_censusdata,
const Eigen::Map<Eigen::VectorXd> locationeffects,
const Eigen::Map<Eigen::VectorXd> residuals,
const Eigen::Map<Eigen::MatrixXd> X,
const Eigen::Map<Eigen::MatrixXd> beta_sample)
{
// --------- create random sample of locations and of residuals --------- //
// initialise random seeds
std::random_device rd; // used to obtain a seed for the number engine
std::mt19937 gen(rd()); // Mersenne Twister engine
// initialize distributions for randam locations and residuals
const int upperlocation = locationeffects.size();
const int upperresiduals = residuals.size();
std::uniform_int_distribution<> distrloc(1, upperlocation);
std::uniform_int_distribution<> distrres(1, upperresiduals);
// initialize and fill matrix for randam locations and residuals
Eigen::MatrixXd LocationEffectResiduals(n_obs_censusdata, n_bootstrap);
for (int i=0; i<n_obs_censusdata; ++i)
for (int j=0; j<n_bootstrap; j++)
LocationEffectResiduals(i,j) = locationeffects[distrloc(gen)-1] + residuals[distrres(gen)-1]; // subtract 1 because in C++ indices start with 0
// ----- create Xbeta ------- //
Eigen::MatrixXd Xbeta = X * beta_sample;
// ----- combine results ------- //
Eigen::MatrixXd returnmatrix = Xbeta + LocationEffectResiduals;
return Rcpp::wrap(returnmatrix);
}
解决方案
在这里,您要创建一个大型矩阵。原则上可以将其分配给多个进程,但最终要承担合并结果的成本。我建议在这里使用“共享内存并行”。我使用此处的 OpenMP 代码作为算法并行版本的起点:
// [[Rcpp::depends(RcppEigen)]]
#include <RcppEigen.h>
// [[Rcpp::depends(dqrng)]]
#include <xoshiro.h>
// [[Rcpp::plugins(openmp)]]
#include <omp.h>
// [[Rcpp::plugins(cpp11)]]
#include <random>
// [[Rcpp::export]]
Eigen::MatrixXd funD(const int n_bootstrap,
const int n_obs_censusdata,
const Eigen::Map<Eigen::VectorXd> locationeffects,
const Eigen::Map<Eigen::VectorXd> residuals,
const Eigen::Map<Eigen::MatrixXd> X,
const Eigen::Map<Eigen::MatrixXd> beta_sample,
int ncores) {
// --------- create random sample of locations and of residuals --------- //
// initialise random seeds
std::random_device rd; // used to obtain a seed for the number engine
dqrng::xoshiro256plus gen(rd());
// initialize distributions for randam locations and residuals
const int upperlocation = locationeffects.size();
const int upperresiduals = residuals.size();
// subtract 1 because in C++ indices start with 0
std::uniform_int_distribution<> distrloc(0, upperlocation - 1);
std::uniform_int_distribution<> distrres(0, upperresiduals - 1);
// initialize and fill matrix for randam locations and residuals
Eigen::MatrixXd LocationEffectResiduals(n_obs_censusdata, n_bootstrap);
#pragma omp parallel num_threads(ncores)
{
dqrng::xoshiro256plus lgen(gen); // make thread local copy of rng
lgen.jump(omp_get_thread_num() + 1); // advance rng by 1 ... ncores jumps
#pragma omp for
for (int i=0; i<n_obs_censusdata; ++i)
for (int j=0; j<n_bootstrap; j++)
LocationEffectResiduals(i,j) = locationeffects[distrloc(lgen)] + residuals[distrres(lgen)];
}
// ----- create Xbeta ------- //
Eigen::MatrixXd Xbeta = X * beta_sample;
// ----- combine results ------- //
Eigen::MatrixXd returnmatrix = Xbeta + LocationEffectResiduals;
return returnmatrix;
}
在我的双核 Linux 系统上,我funD
的 withncores = 1
比你的稍快funC
,可能是因为使用的 RNG 更快。有了ncores = 2
它,它又增加了 30-40%。不错,因为并非所有代码都是并行执行的。我不知道这些天在 Windows 上的 OpenMP 性能有多好。改用它可能有意义RcppParallel
。但这需要对您的代码进行更多更改。
上述代码的来源是Rcpp::sourceCpp()
. 当你把它放入一个包中时,你应该使用
CXX_STD = CXX11
PKG_CXXFLAGS = $(SHLIB_OPENMP_CXXFLAGS)
PKG_LIBS = $(SHLIB_OPENMP_CXXFLAGS)
在Makevars(.win)
. 请注意,根据WRE,如果 C++11 与 C++98 使用不同的编译器,这可能不会像预期的那样工作。IIRC Solaris 是唯一在默认配置中出现这种情况的平台。所以对于内部包你应该没问题。
推荐阅读
- sonarqube - 使用 SonarQube Web API 检索条件覆盖信息
- mysql - SQL 为每个 userID 记录计数一个值,除非 userId 的一个记录值为 x
- python - 从具有精确单词匹配的句子列表中获取句子:Python
- javascript - 选择加选项下拉列表播放音频文件
- facebook - Facebook 返回“此网页包含被阻止的 URL”
- javascript - 视频播放,然后由带有链接的静止图像覆盖 onended()
- r - 如何检查函数是否在 R 的向量中?
- r - 从字符串中提取元素
- android - 如何在片段的tablayout中执行onCLick Listener?
- vba - Excel VBA 宏编译错误:预期结束子