r - 使用 sparklyr 运行并行函数调用
问题描述
目前,我正在使用 doparallel 库中的 foreach 循环在同一台机器的多个内核上并行运行函数调用,如下所示:
out_results=foreach(i =1:length(some_list))%dopar%
{
out=functions_call(some_list[[i]])
return(out)
}
这个some_list是一个数据帧列表,每个数据帧会有不同的列数,function_call() 是一个函数,它对数据执行多项操作,例如数据操作,然后使用随机森林进行变量选择,最后执行最小二乘拟合。变量out又是 3 个数据框的列表,out_results将是列表的列表。我在函数调用中使用 CRAN 库和我创建的一些自定义库,我想避免使用 spark ML 库,因为它们的功能有限并且需要重写整个代码。
我想利用 spark 并行运行这些函数调用。有可能这样做吗?如果是的话,我应该考虑哪个方向。我已经阅读了 sparklyr 的很多文档,但它似乎没有多大帮助,因为那里提供的示例非常简单。
解决方案
SparklyR 的主页给出了分布在 Spark 集群上的任意 R 代码示例。特别是,请参阅他们的分组操作示例。
您的主要结构应该是一个数据框,您将对其进行逐行处理。可能类似于以下内容(未经测试):
some_list = list(tibble(a=1[0]), tibble(b=1), tibble(c=1:2))
all_data = tibble(i = seq_along(some_list), df = some_list)
# Replace this with your actual code.
# Should get one dataframe and produce one dataframe.
# Embedded dataframe columns are OK
transform_one = function(df_wrapped) {
# in your example, you expect only one record per group
stopifnot(nrow(df_wrapped)==1)
df = df_wrapped$df
res0 = df
res1 = tibble(x=10)
res2 = tibble(y=10:11)
return(tibble(res0 = list(res0), res1 = list(res1), res2 = list(res2)))
}
all_data %>% spark_apply(
transform_one,
group_by = c("i"),
columns = c("res0"="list", "res1"="list", "res2"="list"),
packages = c("randomForest", "etc")
)
总而言之,这种方法看起来很不自然,就好像我们在一个不适合的任务上强制使用 Spark。也许您应该检查另一个并行化框架?
推荐阅读
- django - 一个 Django 应用程序/模型到多个数据库
- javascript - 如何将键值对添加到处于功能组件状态的对象中
- python - 如何使用 Python 和 pexpect 在多个会话中运行 linux 可执行文件
- r - 使用 R 中的正参数输入优化函数
- xcode - 当文件位于多个文件夹中时,如何移动 XCode 项目?
- c++ - 为什么在此示例中 C++ 名称查找似乎不一致?
- amazon-web-services - AWS Glue Crawler 无需 Glue 作业即可将所有数据发送到 Glue Catalog 和 Athena
- angularjs - ChartistJS 时间显示不正确
- javascript - 与 html 同时创建的事件侦听器未触发
- android - 如何在没有 FocusChange 的情况下检测 EditText 上键入的开始和结束