r - 在每次迭代期间将循环与更新并行化
问题描述
我有一些 R 代码将美国所有州的人口普查中的人口统计数据汇总到一个列表对象中。由于有大约 11M 块,块级代码可能需要一周的时间才能作为顺序循环运行,因此我试图在状态上并行化循环以使其更快。我已经实现了这个目标:
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
"ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
"MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
"ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
"VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
plan(multiprocess)
ptm <- proc.time()
CensusObj_block_age_sex = list()
CensusObj_block_age_sex[states] <- future_lapply(states, function(s){
county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
tract <- census_geo_api(key = "XXX", state = s, geo = "tract", age = TRUE, sex = TRUE)
block <- census_geo_api(key = "XXX", state = s, geo = "block", age = TRUE, sex = TRUE)
censusObj[[s]] <- list(state = s, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
}
)
但是,我需要使它更健壮。有时 Census API 会出现问题,因此我希望在每次状态迭代时更新 CensusObj,以便在出现问题时不会丢失已完成的数据。这样,如果出现问题,我可以在剩余状态上重新启动循环(例如,如果我将“WY”拼写为“WU”)
有可能以某种方式实现这一目标吗?我对其他并行化方法持开放态度。
上面的代码运行,但似乎遇到了内存问题:
Error: Failed to retrieve the value of MultisessionFuture (future_lapply-3) from cluster RichSOCKnode #3 (PID 80363 on localhost ‘localhost’). The reason reported was ‘vector memory exhausted (limit reached?)’. Post-mortem diagnostic: A process with this PID exists, which suggests that the localhost worker is still alive.
我的 .Renviron 中有 R_MAX_VSIZE = 8Gb,但我不确定如何在我的机器上的 8 个内核之间进行划分。这一切都表明我需要存储每次迭代的结果,而不是尝试将其全部保存在内存中,然后在最后将对象附加在一起。
解决方案
这是一个使用的解决方案doParallel
(带有适用于 UNIX 系统的选项,但您也可以在 Windows 上使用它,请参见此处),foreach
它分别存储每个状态的结果,然后读入单个文件并将它们组合到一个列表中。
library(doParallel)
library(foreach)
path_results <- "my_path"
ncpus = 8L
registerDoParallel(cores = ncpus)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
"ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
"MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
"ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
"VT","VA","WA","WV","WI","WY","DC","PR")
results <- foreach(state = states) %dopar% {
county <- census_geo_api(key = "XXX", state = state, geo = "county", age = TRUE, sex = TRUE)
tract <- census_geo_api(key = "XXX", state = state, geo = "tract", age = TRUE, sex = TRUE)
block <- census_geo_api(key = "XXX", state = state, geo = "block", age = TRUE, sex = TRUE)
results <- list(state = state, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
# store the results as rds
saveRDS(results,
file = paste0(path_results, "/", state, ".Rds"))
# remove the results
rm(county)
rm(tract)
rm(block)
rm(results)
gc()
# just return a string
paste0("done with ", state)
}
library(purrr)
# combine the results to a list
result_files <- list.files(path = path_results)
CensusObj_block_age_sex <- set_names(result_files, states) %>%
map(~ readRDS(file = paste0(path_results, "/", .x)))
推荐阅读
- python - 如何计算熊猫中每个 p1 id 下的所有孩子的数量?id 和 parent id 已给出
- python - python - 从 python 脚本启动的跟踪进程
- android - Android Paint.setStrokeCap(Paint.Cap.ROUND) 性能问题
- macos - 禁用 Jupyter Notebook 单元格搜索
- android - 如何将套接字移动扫描仪与 android 应用程序集成并检索结果
- bash - 从环境变量展开变量集
- arrays - 这个使用范围的简单 For 循环如何以及为什么在 Go 中打印一个简单的 3D 数组?
- json - Applescript JSON 提取
- regex - Selenium IDE - 如何将正则表达式与命令“断言文本”一起使用?
- c++ - Change in number of button cause alignment issue in WxWidgets for Windows