首页 > 解决方案 > 如何在 R 中的函数上使用并行处理而不弹出此错误?

问题描述

我在尝试并行化我的功能时遇到了问题。我在网上查看了很多可用的提示,但是当我尝试执行建议的代码时,我收到一个错误“match.fun(FUN) 中的错误:缺少参数“FUN”,没有默认值此外:警告消息:1:在.Internal(gc(详细,重置,完整)):关闭未使用的连接6 ...“

我的功能是:

## tau-leap Gillespie algorithm function
tauLeapG <- function(beta, # transmission rate
                     theta, # dispersal scale
                     b=1, # kernel shape parameter, 1 for exponential
                     sigma=0, # asymptomatic period, used for outputing the time series
                     q0=0, # starting incidence if ppp is without marks
                     q.end=1, # stoping condition 1: incidence lvl
                     t.end=Inf, # stoping condition 2: time after first simulated time step
                     area.host=10, # surface area occupied by one host
                     delta.t=1, # time step
                     ppp, # point pattern as a ppp object, optinally with marks 1/0 for infected/healthy
                     dist.mat=NULL){ # matrix distance if its computation is to be avoided here (for e.g. repeated calls)
  
  ## if the point pattern has no marks, generate some randomly that fits q0
  if (is.null(marks(ppp))){
    inf.start <- max(1, round(ppp$n * q0))
    marks(ppp) <- sample(c(rep(FALSE, ppp$n-inf.start), rep(TRUE, inf.start)))
  }
  
  ## compute distance matrix if not provided
  if (is.null(dist.mat)){ 
    ## add the kernel computation that can be added directly on the dist matrix to reduce comp time
    dist.mat <- exp(-pairdist(ppp)^b / theta^b)
    diag(dist.mat) <- NA
  }
  
  ## function that compute infection event probability, based on the dispersal kernel
  k.norm <- beta * area.host * (b/(2*pi*theta^2*gamma(2/b))) # constant part of the exponential power kernel
  infection <- function(infected, dist){
    inf <-  matrix(k.norm * dist[infected,!infected],
                   nrow=sum(infected), byrow=FALSE)
    inf[is.na(inf)] <- 0
    inf
  }
  
  ## starting time
  time <- 0
  ## inititate the heavy dataframe that will aggregate all changes
  df.big <- data.frame(time=0, who=which(ppp$marks), t(ppp$marks))
  
  ## computation loop
  while (any(!ppp$marks) & time <= t.end & mean(ppp$marks) < q.end){
    ## infection event probaility
    events <- infection(ppp$marks, dist=dist.mat)
    ## random proisson draws
    new.infected <- which(!ppp$marks)[rpois(n=sum(!ppp$marks), lambda=apply(events, 2, sum) * delta.t) > 0]
    ## change marks of newly infected
    ppp$marks[new.infected] <- TRUE
    ## increment time
    time <- time + delta.t
    ## if some infection, increment the big dataframe
    if (length(new.infected) > 0){
      df.big <- rbind(df.big, data.frame(time=time, who=new.infected, t(ppp$marks)))
    }
    ## print a dot per new infection
    # cat(paste0(rep('.', length(new.infected)), collapse = '')) ## comment for quiet
  }
  
  ## make compact, time only, version of the big dataframe
  times.i <- unique(df.big[,1])
  times.d <- times.i + sigma
  times <- sort(unique(c(times.i, times.d)))
  infected <- sapply(times, FUN=function(t) sum(t >= df.big[,1]))
  detectable <- sapply(times, FUN=function(t) sum(t >= df.big[,1] + sigma))
  df.small <- data.frame(time=times, infected=infected, detectable=detectable)
  
  ## out put the simplified time series, and the big one
  list(df.small[df.small$time <= max(df.big$time),], df.big) 
} 

然后我创建一个风景:

library('spatstat')
library('ggplot2')
library('dplyr')
library('reshape2')

n <- 1000 # number of hosts
dim <- 1000 # dimension of the landscape
landscape <- ppp(x=runif(n)*dim, y=runif(n)*dim, 
                 window = owin(xrange = c(0, dim), yrange = c(0, dim)))
## give marks to the process, e.g. 1 infected randomly selected
marks(landscape) <- sample(c(TRUE, rep(FALSE, n-1)))

我正在尝试在下面并行化此功能:


output <- tauLeapG(beta=1,
                   theta=.5,
                   b=.4,
                   sigma=sigma, 
                   delta.t=10,
                   ppp=landscape)

我努力了:

install.packages("Parallel")
library("Parallel")
install.packages("doParallel")
library("doParallel")

no_cores <- detectCores(logical = TRUE)

cl<- makeCluster ( 4 , type = "SOCK" )
registerDoParallel(cl)
clusterExport(cl,list("tauLeapG","landscape"))
system.time(
  output<-c(parApply(cl,landscape,1,fun=tauLeapG))
)

但这会返回上面的错误。我对并行处理很陌生,所以有人可以帮我弄清楚如何并行化这段代码吗?谢谢。

标签: rparallel-processing

解决方案


推荐阅读