r - 如何使用future_lapply和data.table循环读取巨大的csv文件夹并返回汇总表
问题描述
我的硬盘上存储了一个包含 10,000 多个 csv 文件的文件夹。每个 csv 都针对一个物种,并在栅格单元中存在(如果物种存在于地球上的每个单元格中,则超过 500 万个单元格)。
我需要读取每个文件并使用 dplyr 加入其他数据帧并进行汇总,然后返回摘要 df。我没有服务器来运行它,它使我的桌面停滞不前。它适用于 17 种 csvs 的子集,但即便如此它也很慢。
这类似于其他一些关于处理大数据的问题,但我无法弄清楚 data.table、bigmemory 和 future 等包的正确组合。我认为真正慢的部分是 dplyr 命令,而不是读取文件,但我不确定。
我不确定这是否可以在没有文件的情况下回答,但它们很大,所以不知道如何使它可重现?
spp_ids <- <vector of the species ids, in this case 17 of them>
spp_list <- <datafame with ids of the 17 spp in the folder>
spp_info <- <dataframe with the species id and then some other columns>
cellid_df <- <big df with 5 million+ cell ids and corresponding region names>
# Loop
spp_regions <- future_lapply(spp_ids, FUN = function(x) {
csv_file <- file.path("//filepathtoharddrivefolder",
sprintf('chrstoremove_%s.csv', x)) # I pull just the id number from the file names
# summarise number of regions and cells
spp_region_summary <- data.table::fread(csv_file, sep = ",") %>%
dplyr::mutate(spp_id = x) %>%
dplyr::filter(presence == 1) %>% # select cell ids where the species is present
dplyr::left_join(cellid_df, by = "cell_id") %>%
dplyr::group_by(region, spp_id) %>%
dplyr::summarise(num_cells = length(presence)) %>%
dplyr::ungroup()
# add some additional information
spp_region_summary <- spp_region_summary %>%
dplyr::left_join(spp_info, by = "spp_id") %>%
dplyr::left_join(spp_list, by = "spp_id") %>%
dplyr::select(region, spp_id, num_cells)
return(spp_region_summary)
})
spp_regions_df <- dplyr::bind_rows(spp_regions)
fwrite(spp_regions_df,"filepath.csv")
以前没有处理过这么多数据,所以我从来不用离开 tidyverse!
解决方案
我试图重现这一点。cellid_df
我为每个单独的文件生成了 1000 万行。15 个“文件”只用了大约 40 秒(使用 reprex 增加了 20 秒)。
如果您可以让您的笔记本电脑运行半天左右,那么应该这样做。
几个建议:
如果您担心内存问题,可以写入文件。
由于
spp_id
在每次迭代中都是唯一的,因此您可以在合并后添加它。这将节省一些时间。“附加信息”可以加入到最终数据帧,因为它是键控的
spp_id
。在data.table
,left_join(X,Y,by='id')
将成为Y[X,on='id']
library(data.table)
spp_ids <- 1:15
set.seed(123)
N <- 1e7 # number of cell_ids
# Dummy cell ids + regions
cellid_df <- data.table(cell_id=1:N,region=sample(state.abb,N,replace = T))
head(cellid_df)
#> cell_id region
#> 1: 1 NM
#> 2: 2 IA
#> 3: 3 IN
#> 4: 4 AZ
#> 5: 5 TN
#> 6: 6 WY
#
outfile <- 'test.csv'
if(file.exists(outfile))
file.remove(outfile)
a=Sys.time()
l<- lapply(spp_ids, function(x){
#Generate random file with cell_id and presence
spp_file <- data.table(cell_id=1:N,presence=round(runif(N)))
present_cells <- cellid_df[spp_file[presence==1],on='cell_id'] # Filter and merge
spp_region_summary <- present_cells[,.(spp_id=x,num_cells=.N),by=.(region)] # Summarise and add
setcolorder(spp_region_summary,c('spp_id','region','num_cells')) # Reorder the columns if you want
fwrite(spp_region_summary,outfile,append = file.exists(outfile)) # Write the summary to disk to avoid memory issues
# If you want to keep it in memory, you can return it and use rbindlist
# spp_region_summary
})
b=Sys.time()
b-a
#> Time difference of 1.019157 mins
# Check lines in file = (No of species) x (No of regions) + 1
R.utils::countLines(outfile)
#> Registered S3 method overwritten by 'R.oo':
#> method from
#> throw.default R.methodsS3
#> [1] 751
#> attr(,"lastLineHasNewline")
#> [1] TRUE
由reprex 包(v0.3.0)于 2019 年 12 月 20 日创建
推荐阅读
- python - 根据具有条件的其他列值创建新列
- google-sheets - 部分匹配一个范围的字符串与另一个范围的字符串,输出部分匹配
- php - 从 Laravel 中的关系中获取最后的记录
- apache-kafka - 在 Kaka Producer 中配置重试的最佳方式
- javascript - Webassembly.instantiate 期间如何定义最大内存?
- c# - 可空引用类型在具有临时可空字段的类中的使用
- memory-leaks - 如何阻止“com.newrelic.agent.Transaction”中的内存泄漏
- rest - 如何从 REST API 访问嵌套的 json?
- shell - 如何使用 Jenkins 在 Shell Script 中执行 curl 命令?
- mapbox - Mapbox GL-JS:如何为图标实例添加自定义图像?