首页 > 解决方案 > 如何使用future_lapply和data.table循环读取巨大的csv文件夹并返回汇总表

问题描述

我的硬盘上存储了一个包含 10,000 多个 csv 文件的文件夹。每个 csv 都针对一个物种,并在栅格单元中存在(如果物种存在于地球上的每个单元格中,则超过 500 万个单元格)。

我需要读取每个文件并使用 dplyr 加入其他数据帧并进行汇总,然后返回摘要 df。我没有服务器来运行它,它使我的桌面停滞不前。它适用于 17 种 csvs 的子集,但即便如此它也很慢。

这类似于其他一些关于处理大数据的问题,但我无法弄清楚 data.table、bigmemory 和 future 等包的正确组合。我认为真正慢的部分是 dplyr 命令,而不是读取文件,但我不确定。

我不确定这是否可以在没有文件的情况下回答,但它们很大,所以不知道如何使它可重现?

spp_ids <- <vector of the species ids, in this case 17 of them>

spp_list <- <datafame with ids of the 17 spp in the folder>

spp_info <- <dataframe with the species id and then some other columns>

cellid_df <- <big df with 5 million+ cell ids and corresponding region names>

# Loop
spp_regions <- future_lapply(spp_ids, FUN = function(x) {

csv_file <- file.path("//filepathtoharddrivefolder", 
sprintf('chrstoremove_%s.csv', x)) # I pull just the id number from the file names

# summarise number of regions and cells
spp_region_summary <- data.table::fread(csv_file, sep = ",") %>%
dplyr::mutate(spp_id = x) %>%
dplyr::filter(presence == 1) %>% # select cell ids where the species is present
dplyr::left_join(cellid_df, by = "cell_id") %>% 
dplyr::group_by(region, spp_id) %>%
dplyr::summarise(num_cells = length(presence)) %>%
dplyr::ungroup()

# add some additional information
spp_region_summary <- spp_region_summary %>%
dplyr::left_join(spp_info, by = "spp_id") %>%
dplyr::left_join(spp_list, by = "spp_id") %>%
dplyr::select(region, spp_id, num_cells)

return(spp_region_summary)
})

spp_regions_df <- dplyr::bind_rows(spp_regions)

fwrite(spp_regions_df,"filepath.csv")

以前没有处理过这么多数据,所以我从来不用离开 tidyverse!

标签: rdplyrdata.tablebigdata

解决方案


我试图重现这一点。cellid_df我为每个单独的文件生成了 1000 万行。15 个“文件”只用了大约 40 秒(使用 reprex 增加了 20 秒)。

如果您可以让您的笔记本电脑运行半天左右,那么应该这样做。

几个建议:

  1. 如果您担心内存问题,可以写入文件。

  2. 由于spp_id在每次迭代中都是唯一的,因此您可以在合并后添加它。这将节省一些时间。

  3. “附加信息”可以加入到最终数据帧,因为它是键控的spp_id。在data.table,left_join(X,Y,by='id')将成为Y[X,on='id']

library(data.table)

spp_ids <- 1:15
set.seed(123)
N <- 1e7 # number of cell_ids

# Dummy cell ids + regions
cellid_df <- data.table(cell_id=1:N,region=sample(state.abb,N,replace = T))
head(cellid_df)
#>    cell_id region
#> 1:       1     NM
#> 2:       2     IA
#> 3:       3     IN
#> 4:       4     AZ
#> 5:       5     TN
#> 6:       6     WY
# 

outfile <- 'test.csv' 
if(file.exists(outfile))
  file.remove(outfile)

a=Sys.time()
l<- lapply(spp_ids, function(x){
  #Generate random file with cell_id and presence
  spp_file <- data.table(cell_id=1:N,presence=round(runif(N))) 

  present_cells <- cellid_df[spp_file[presence==1],on='cell_id'] # Filter and merge
  spp_region_summary <- present_cells[,.(spp_id=x,num_cells=.N),by=.(region)] # Summarise and add
  setcolorder(spp_region_summary,c('spp_id','region','num_cells')) # Reorder the columns if you want

  fwrite(spp_region_summary,outfile,append = file.exists(outfile)) # Write the summary to disk to avoid memory issues
  # If you want to keep it in memory, you can return it and use rbindlist
  # spp_region_summary
})
b=Sys.time()
b-a
#> Time difference of 1.019157 mins
# Check lines in file = (No of species) x (No of regions) + 1
R.utils::countLines(outfile) 
#> Registered S3 method overwritten by 'R.oo':
#>   method        from       
#>   throw.default R.methodsS3
#> [1] 751
#> attr(,"lastLineHasNewline")
#> [1] TRUE

reprex 包(v0.3.0)于 2019 年 12 月 20 日创建


推荐阅读