首页 > 解决方案 > 在每次迭代期间将循环与更新并行化

问题描述

我有一些 R 代码将美国所有州的人口普查中的人口统计数据汇总到一个列表对象中。由于有大约 11M 块,块级代码可能需要一周的时间才能作为顺序循环运行,因此我试图在状态上并行化循环以使其更快。我已经实现了这个目标:

states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
           "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
           "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
           "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
           "VT","VA","WA","WV","WI","WY","DC","PR")
library(future.apply)
plan(multiprocess)
ptm <- proc.time()
CensusObj_block_age_sex = list()

CensusObj_block_age_sex[states] <- future_lapply(states, function(s){
  county <- census_geo_api(key = "XXX", state = s, geo = "county", age = TRUE, sex = TRUE)
  tract  <- census_geo_api(key = "XXX", state = s, geo = "tract",  age = TRUE, sex = TRUE)
  block  <- census_geo_api(key = "XXX", state = s, geo = "block",  age = TRUE, sex = TRUE)
  censusObj[[s]] <- list(state = s, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
}
)

但是,我需要使它更健壮。有时 Census API 会出现问题,因此我希望在每次状态迭代时更新 CensusObj,以便在出现问题时不会丢失已完成的数据。这样,如果出现问题,我可以在剩余状态上重新启动循环(例如,如果我将“WY”拼写为“WU”)

有可能以某种方式实现这一目标吗?我对其他并行化方法持开放态度。


上面的代码运行,但似乎遇到了内存问题:

Error: Failed to retrieve the value of MultisessionFuture (future_lapply-3) from cluster RichSOCKnode #3 (PID 80363 on localhost ‘localhost’). The reason reported was ‘vector memory exhausted (limit reached?)’. Post-mortem diagnostic: A process with this PID exists, which suggests that the localhost worker is still alive.

我的 .Renviron 中有 R_MAX_VSIZE = 8Gb,但我不确定如何在我的机器上的 8 个内核之间进行划分。这一切都表明我需要存储每次迭代的结果,而不是尝试将其全部保存在内存中,然后在最后将对象附加在一起。

标签: rparallel-processing

解决方案


这是一个使用的解决方案doParallel(带有适用于 UNIX 系统的选项,但您也可以在 Windows 上使用它,请参见此处),foreach它分别存储每个状态的结果,然后读入单个文件并将它们组合到一个列表中。

library(doParallel)
library(foreach)

path_results <- "my_path"
ncpus = 8L
registerDoParallel(cores = ncpus)
states <- c("AL","AK","AZ","AR","CA","CO","CT","DE","FL","GA","HI",
            "ID","IL","IN","IA","KS","KY","LA","ME","MD","MA","MI",
            "MN","MS","MO","MT","NE","NV","NH","NJ","NM","NY","NC",
            "ND","OH","OK","OR","PA","RI","SC","SD","TN","TX","UT",
            "VT","VA","WA","WV","WI","WY","DC","PR")
results <- foreach(state = states) %dopar% {
                     county <- census_geo_api(key = "XXX", state = state, geo = "county", age = TRUE, sex = TRUE)
                     tract  <- census_geo_api(key = "XXX", state = state, geo = "tract",  age = TRUE, sex = TRUE)
                     block  <- census_geo_api(key = "XXX", state = state, geo = "block",  age = TRUE, sex = TRUE)
                     results <- list(state = state, age = TRUE, sex = TRUE, block = block, tract = tract, county = county)
                     
                     # store the results as rds
                     saveRDS(results,
                             file = paste0(path_results, "/", state, ".Rds"))
                     
                     # remove the results
                     rm(county)
                     rm(tract)
                     rm(block)
                     rm(results)
                     gc()
                     
                     # just return a string
                     paste0("done with ", state)
}

library(purrr)
# combine the results to a list
result_files <- list.files(path = path_results)
CensusObj_block_age_sex <- set_names(result_files, states) %>% 
  map(~ readRDS(file = paste0(path_results, "/", .x)))

推荐阅读