首页 > 解决方案 > R使用doParallel按因子级别从多个文件中拆分大量数据集的有效方法

问题描述

我有大量文件需要读入 R,将它们放入数据框中,然后按特定列(“praacid”)拆分。计算将在集群上执行。我的代码使用较少数量的文件,但是使用所有文件时的数据量对于 R 来说太大了。

期望的输出:包含因子“pracid”的每个级别的数据集的单独文件。例如,file1 包含 pracid==1 的所有数据,file2 包含 pracid==None 的所有数据,等等。

data.table::rbindlist(dat) 中的错误:列表中的总行数为 3479242206,大于最大行数,当前为 21474833647。

我还必须在另一个包含更多文件的目录中工作。处理这么多数据的最有效方法是什么?

library(plyr)
library(tidyverse) 
library(broom)
library(dplyr) 
library(sqldf)
library(data.table)
library(stringr)
library(lubridate) 
library(doParallel) 

procs = as.numeric(Sys.getenv("MOAB_PROCCOUNT"))
registerDoParallel(cores=procs)

files = list.files(path = paste0(data_dir, "Consultation"), pattern = "*.txt$", full.names = T)

dat = foreach(i = files) %dopar% read.delim(i)
dat = as.data.frame(data.table::rbindlist(dat))
dat = dat %>% distinct(consid, .keep_all = TRUE) %>% arrange(pracid)

write.table(dat, file = paste0(output_dir, "consultation.txt"), sep = "\t")

# Split by practice

dat.split = as.data.table(dat)
dat.split = split(dat.split, by = c("pracid"))

for (i in 1:length(dat.split)){
  write.table(dat.split[i], file=paste0(practice_dir, "Consultation/",
                                        names(dat.split)[i], ".txt"), sep = "\t",
              row.names=FALSE, col.names = colnames(dat.split[[1]]))
}

文件具有以下格式:

patid    consid    pracid    staffid
50000082035    23408234    2002    12003
235235    234234    45666    209

标签: rparallel-processingbigdatadoparallel

解决方案


这在 bash 和 awk 中比 R 更容易和更快:

#!/bin/bash

cd /your/data/dir
# it's a better practice to make a separate outdir:
mkdir Consultation_bypracid
find Consultation -name "*.txt" -print0 | xargs -0 awk -F$'\t' '
BEGIN{RS="\r\n|\n|\r";ORS="\n"}
{
  # get column index of pracid column
  if(ci==""){
    header=$0
    for(ci=1;ci<=NF;ci++){
      if($ci=="pracid") break
    }
  }

  if(FNR>1){ # skip input file header row
    pracid = $ci
    if(outpath[pracid]==""){
      outpath[pracid] = "Consultation_bypracid/"pracid".txt"
      print header >> outpath[pracid]
    }
    print >> outpath[pracid]
  }
}'

在我的系统上,一个非常大的带有非 SSD 磁盘的 Google 虚拟机,它每分钟处理大约 1 GB 的输入数据。此任务使用大量磁盘 I/O 和非常少的 cpu,因此并行化无济于事,您的磁盘速度将决定其运行速度。

将文件输出到名为 [first pracid].txt...[last pracid].txt 的 Consultation_bypracid


推荐阅读