首页 > 解决方案 > 在没有错误的并行循环中附加 CSV

问题描述

我需要使用并行循环附加一个 CSV,我想知道是否有任何方法可以做到这一点而没有错误。

基本上,我需要处理很多数据,你不能把它全部塞进内存,所以我需要追加结果。这将永远lapply循环,所以我正在使用这个pbapply包。但是在附加文件时,由于通常会同时附加两个核心,因此会弄乱 csv 配置。

我假设有某种方法可以在某个集群正在处理文件时锁定与文件的连接,并且在该连接关闭时让其他集群稍等片刻以重试,但我找不到办法做到这一点.

这是我遇到的错误类型的示例:

library(parallel)
library(pbapply)
library(data.table)

write_random_thing <- function(x){
  require(data.table)

  y <- data.table(A = x, B = round(rnorm(10)*100,2))

  pth <- 'example.csv'
  fwrite(y, pth, append = TRUE)

  y
}

cl <- makeCluster(4)
xx <- pblapply(1:20, cl = cl, FUN = write_random_thing)
stopCluster(cl = cl)

yy <- rbindlist(xx)

zz <- fread('example.csv') # this will usually return an error

在这种情况下,yyandzz应该是相同的(即使顺序不同),但通常甚至无法读取文件,因为列数不是恒定的。

我一直在寻找一些解决方案,如果在您尝试写入文件时文件被锁定,它会休眠几秒钟并重试。存在这样的东西吗?

标签: rparallel-processingdata.table

解决方案


如果你需要并行写一些东西,你需要锁来确保两个进程不会同时写。

这很容易在 R 中使用包 {flock} 完成:

library(parallel)
library(pbapply)
library(data.table)

write_random_thing <- function(x){
  require(data.table)

  y <- data.table(A = x, B = round(rnorm(10)*100,2))

  pth <- 'example.csv'
  lock <- flock::lock(pth)
  fwrite(y, pth, append = TRUE)
  flock::unlock(lock)

  y
}

cl <- makeCluster(4)
xx <- pblapply(1:20, cl = cl, FUN = write_random_thing)
stopCluster(cl = cl)

yy <- rbindlist(xx)

zz <- fread('example.csv') # this will usually return an error

推荐阅读