首页 > 解决方案 > 使用 sparklyr 运行并行函数调用

问题描述

目前,我正在使用 doparallel 库中的 foreach 循环在同一台机器的多个内核上并行运行函数调用,如下所示:

out_results=foreach(i =1:length(some_list))%dopar%
{
   out=functions_call(some_list[[i]])
   return(out)
}

这个some_list是一个数据帧列表,每个数据帧会有不同的列数,function_call() 是一个函数,它对数据执行多项操作,例如数据操作,然后使用随机森林进行变量选择,最后执行最小二乘拟合。变量out又是 3 个数据框的列表,out_results将是列表的列表。我在函数调用中使用 CRAN 库和我创建的一些自定义库,我想避免使用 spark ML 库,因为它们的功能有限并且需要重写整个代码。

我想利用 spark 并行运行这些函数调用。有可能这样做吗?如果是的话,我应该考虑哪个方向。我已经阅读了 sparklyr 的很多文档,但它似乎没有多大帮助,因为那里提供的示例非常简单。

标签: rapache-sparkparallel-processingsparkrsparklyr

解决方案


SparklyR 的主页给出了分布在 Spark 集群上的任意 R 代码示例。特别是,请参阅他们的分组操作示例。

您的主要结构应该是一个数据框,您将对其进行逐行处理。可能类似于以下内容(未经测试):

some_list = list(tibble(a=1[0]), tibble(b=1), tibble(c=1:2))
all_data = tibble(i = seq_along(some_list), df = some_list)

# Replace this with your actual code. 
# Should get one dataframe and produce one dataframe. 
# Embedded dataframe columns are OK
transform_one = function(df_wrapped) {
  # in your example, you expect only one record per group
  stopifnot(nrow(df_wrapped)==1)
  df = df_wrapped$df
  
  res0 = df
  res1 = tibble(x=10)
  res2 = tibble(y=10:11)
  
  return(tibble(res0 = list(res0), res1 = list(res1), res2 = list(res2)))
}

all_data %>% spark_apply(
  transform_one,
  group_by = c("i"), 
  columns = c("res0"="list", "res1"="list", "res2"="list"),
  packages = c("randomForest", "etc")
)

总而言之,这种方法看起来很不自然,就好像我们在一个不适合的任务上强制使用 Spark。也许您应该检查另一个并行化框架?


推荐阅读