r - 并行计算,在 dplyr 中哪个替代 tidyr::complete?
问题描述
我正在尝试并行化管道。在管道中有一个 tidyr 命令(“tidyr::complete”)。一旦并行运行,这就会破坏代码,因为无法识别对象类。
dplyr 中是否有替代方法可以完成?
library(dplyr)
library(tidyr)
library(zoo)
test <- tibble(year=c(1,2,3,4,5,5,1,4,5),
var_1=c(1,1,1,1,1,1,2,2,2),
var_2=c(1,1,1,1,1,2,3,3,3),
var_3=c(0,5,NA,15,20,NA,1,NA,NA))
max_year <- max(test$year,na.rm = T)
min_year <- min(test$year,na.rm = T)
串行
test_serial <- test %>%
group_by(var_1,var_2) %>%
complete(var_1, year = seq(min_year,max_year)) %>%
mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))
并行(失败)
devtools::install_github("hadley/multidplyr")
library(multidplyr)
cl <- new_cluster(2)
cluster_copy(cl, c("test","max_year","min_year"))
cluster_library(cl, c("dplyr","tidyr","zoo"))
test_parallel <- test %>% group_by(var_1,var_2) %>% partition(cl)
test_parallel <- test_parallel %>%
dplyr::group_by(var_1,var_2) %>%
tidyr::complete(var_1, year = seq(min_year,max_year)) %>%
dplyr::mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE)) %>%
collect()
这是错误信息
Error in UseMethod("complete_") :
no applicable method for 'complete_' applied to an object of class "multidplyr_party_df"
解决方案
Multidplyr 允许您:
- 使用拆分数据
partition()
- 在专用节点上处理每个分区
collect()
结果
所有数据处理任务都不适用于之前的工作流程。
特别是,complete
需要知道输入数据中所有可能的值才能创建丢失的行,这意味着这个操作作为一个整体是无法拆分的,这就是为什么没有可用的方法可用的原因。
在您提供的示例中,每个节点将收到一var_1, var_2
对而不知道其他节点得到了什么,这不允许并行实现预期结果。
但是,正如您已经知道的那样year = seq(min_year,max_year)
,您可以仅并行化complete
此变量的任务,将任务拆分var_1
为 ,例如使用furrr
包:
library(furrr)
plan(multiprocess)
test_parallel <- test %>%
group_by(var_1,var_2) %>%
complete(var_1) %>% split(.$var_1) %>%
furrr::future_map(~{
complete(.x, year = seq(min_year,max_year)) %>%
dplyr::mutate(
var_3 = na.approx(var_3,na.rm = FALSE),
var_3 = if(all(is.na(var_3))) NA else na.spline(var_3,na.rm = FALSE))
}) %>% bind_rows()
> identical(c(test_serial$var_1,test_serial$var_2,test_serial$var_3,test_serial$year),
+ c(test_parallel$var_1,test_parallel$var_2,test_parallel$var_3,test_parallel$year))
[1] TRUE
在更大的数据集上进行测试以衡量潜在的性能改进。
推荐阅读
- angular - 首先删除一些行并在删除后插入一些其他行的函数的问题
- javascript - 如何使用 javascript 和 html 显示特定的内容时区
- javascript - 将两个减速器合并为一个的最佳方法是什么?
- c# - 尝试执行 SQL 查询时出现无效对象错误
- javascript - 光滑的滑块应该只显示 4 张幻灯片。但是,第 5 个滑块的一部分显示不应该显示
- python - 使用 google cloud sdk python 使缓存的内容无效
- apache-kafka - kafka sink 连接器无法连接到 Couchbase
- angular - 使用 Angular Library Bundle 时 Jsonpath 不起作用
- javascript - 如何在使用(基于 webpack)ZURB 基础框架时通过 jQuery AJAX 从 PHP 脚本中检索数据?
- javascript - 如果可能,使用 map 和 reduce 对动态 JSON 键进行分组 javascript