r - 带有条件逻辑错误的嵌套并行处理
问题描述
这个有点复杂,所以我认为不值得分享我正在使用的确切代码,但我应该能够使用伪代码很好地理解这一点:
一点背景知识:本质上,我正在尝试在嵌套的操作循环上进行并行计算。我有两个大函数,第一个需要运行并返回 TRUE 才能运行第二个函数,如果第二个函数运行,它需要循环多次迭代。现在这是一个嵌套循环,因为我需要针对各种场景多次运行上述整个操作。我尝试使用的伪代码如下:
Output <- foreach(1 to “I”, .packages=packages, .combine=rbind) %:%
Run the first function
If the first function is false:
Print and record
Else:
Foreach(1 to J, .packages=packages, .combine=rbind) %dopar%{
Run the second function
Create df summarizing each loop of second function
}
这是我正在尝试做的事情的简化版本以及我遇到的错误:
library(doParallel)
library(foreach)
func1 <- function(int1){
results <- list(int1,TRUE)
return(results)
}
func2 <- function(int2){
return(int1/int2)
}
int1list <- seq(1,10)
int2list <- seq(1,15)
out <- foreach(i=1:length(int1list),.combine=rbind) %:%
out1 <- func1(i)
if(out1[[2]]==FALSE){
print("fail")
next
} else{
foreach(j=1:length(int2),.combine=rbind) %dopar% {
int3 <- func2(j)
data.frame("Scenario"=i,"Result"=int3)
}
}
错误:func1(i) 中的错误:找不到对象“i”
当我运行上面的代码时,它基本上告诉我它甚至找不到对象“I”,我认为这正在发生,因为我在最里面的循环之外运行了一些调用“I”的东西。我之前已经能够让嵌套的并行循环工作,但是我没有任何需要在最内层循环之外运行的东西,所以我假设这是一个包不知道执行顺序的问题。
我有一个解决方法,我可以并行运行第一个函数,然后根据第一个循环的结果并行运行第二个函数(本质上是两个单独的循环而不是嵌套循环),但我想知道是否有让嵌套循环之类的东西工作的方法,因为我认为它会更有效。在生产中运行时,此代码可能需要数小时才能运行,因此节省一些时间是值得的。
解决方案
我不是 的专业人士foreach
,但有几件事很突出:
func2
两者都引用int1
,int2
但仅给出后者;这可能是您简化示例的产物,也许不是?您的代码需要包含在一个花括号中,即您需要从
out <- foreach(i=1:length(int1list),.combine=rbind) %:% out1 <- func1(i) if(out1[[2]]==FALSE) ...
至
out <- foreach(i=1:length(int1list),.combine=rbind) %:% { out1 <- func1(i) if(out1[[2]]==FALSE) ... }
- 的文档
foreach
建议二元运算符%:%
是在两个foreach
调用之间使用的嵌套运算符,但您没有这样做。我想我可以让它与%do%
(或%dopar%
)一起正常工作 - 我不认为
print
s 在并行foreach
循环中工作得很好......它可能在主节点上找到但不能在所有其他节点上工作,参考:使用 %dopar% 时如何打印 - 可能再次由于简化示例,您定义但实际上并未使用
int1list
(仅其长度)的内容,我将在此示例中进行补救 next
在“正常”的 R 循环中工作,而不是在这些专门的foreach
循环中;不过,这不是问题,因为您的if
/else
结构提供了相同的效果
这是您的示例,稍作修改以考虑上述所有内容。我添加UsedJ
以表明
library(doParallel)
library(foreach)
func1 <- function(int1){
results <- list(int1,int1>2)
return(results)
}
func2 <- function(int1,int2){
return(int1/int2)
}
int1list <- seq(1,3)
int2list <- seq(1,5)
out <- foreach(i=1:length(int1list),.combine=rbind) %do% {
out1 <- func1(int1list[i])
if(!out1[[2]]){
data.frame("Scenario"=i, "Result"=out1[[1]], UsedJ=FALSE)
# next
} else{
foreach(j=1:length(int2list),.combine=rbind) %dopar% {
int3 <- func2(out1[[1]], int2list[j])
data.frame("Scenario"=i,"Result"=int3, UsedJ=TRUE)
}
}
}
out
# Scenario Result UsedJ
# 1 1 1.00 FALSE
# 2 2 2.00 FALSE
# 3 3 3.00 TRUE
# 4 3 1.50 TRUE
# 5 3 1.00 TRUE
# 6 3 0.75 TRUE
# 7 3 0.60 TRUE
编辑
如果您没有看到并行化,可能是因为您还没有设置“集群”。foreach
基于's 使用操作符嵌套循环的方法,对工作流程还有一些其他更改以使其能够很好地并行化%:%
。
为了“证明”这是并行工作的,我添加了一些基于How can I print when using %dopar%的日志记录(因为并行进程并不print
像人们希望的那样)。
library(doParallel)
library(foreach)
Log <- function(text, ..., .port = 4000, .sock = make.socket(port=.port)) {
msg <- sprintf(paste0(as.character(Sys.time()), ": ", text, "\n"), ...)
write.socket(.sock, msg)
close.socket(.sock)
}
func1 <- function(int1) {
Log(paste("func1", int1))
Sys.sleep(5)
results <- list(int1, int1 > 2)
return(results)
}
func2 <- function(int1, int2) {
Log(paste("func2", int1, int2))
Sys.sleep(1)
return(int1 / int2)
}
日志代码的使用需要从该套接字读取的外部方式。我在这里使用 netcat (nc
或 Nmap 的ncat
)ncat -k -l 4000
。这项工作当然不需要工作,但在这里可以方便地查看事情的进展情况。(注意:在您尝试使用之前,此侦听器/服务器需要运行Log
。)
我无法让嵌套的“ foreach
-> func1
-> foreach
-> func2
”正确并行化func2
。根据睡眠,对 的三个调用应该需要 5 秒,func1
对 的五个调用需要 2 秒(每批三个)func2
,但需要 10 秒(对 的三个并行调用func1
,然后对 的五个连续调用func2
):
system.time(
out <- foreach(i=1:length(int1list), .combine=rbind, .packages="foreach") %dopar% {
out1 <- func1(int1list[i])
if (!out1[[2]]) {
data.frame(Scenario=i, Result=out1[[1]], UsedJ=FALSE)
} else {
foreach(j=1:length(int2list), .combine=rbind) %dopar% {
int3 <- func2(out1[[1]], int2list[j])
data.frame(Scenario=i, Result=int3, UsedJ=TRUE)
}
}
}
)
# user system elapsed
# 0.02 0.00 10.09
使用相应的控制台输出:
2018-11-12 11:51:17: func1 2
2018-11-12 11:51:17: func1 1
2018-11-12 11:51:17: func1 3
2018-11-12 11:51:23: func2 3 1
2018-11-12 11:51:24: func2 3 2
2018-11-12 11:51:25: func2 3 3
2018-11-12 11:51:26: func2 3 4
2018-11-12 11:51:27: func2 3 5
(请注意,订单不保证。)
所以我们可以先把它分解成计算的func1
东西:
system.time(
out1 <- foreach(i = seq_along(int1list)) %dopar% {
func1(int1list[i])
}
)
# user system elapsed
# 0.02 0.01 5.03
str(out1)
# List of 3
# $ :List of 2
# ..$ : int 1
# ..$ : logi FALSE
# $ :List of 2
# ..$ : int 2
# ..$ : logi FALSE
# $ :List of 2
# ..$ : int 3
# ..$ : logi TRUE
安慰:
2018-11-12 11:53:21: func1 2
2018-11-12 11:53:21: func1 1
2018-11-12 11:53:21: func1 3
然后工作func2
:
system.time(
out2 <- foreach(i = seq_along(int1list), .combine="rbind") %:%
foreach(j = seq_along(int2list), .combine="rbind") %dopar% {
Log(paste("preparing", i, j))
if (out1[[i]][[2]]) {
int3 <- func2(out1[[i]][[1]], j)
data.frame(i=i, j=j, Result=int3, UsedJ=FALSE)
} else if (j == 1L) {
data.frame(i=i, j=NA_integer_, Result=out1[[i]][[1]], UsedJ=FALSE)
}
}
)
# user system elapsed
# 0.03 0.00 2.05
out2
# i j Result UsedJ
# 1 1 NA 1.00 FALSE
# 2 2 NA 2.00 FALSE
# 3 3 1 3.00 FALSE
# 4 3 2 1.50 FALSE
# 5 3 3 1.00 FALSE
# 6 3 4 0.75 FALSE
# 7 3 5 0.60 FALSE
两秒(第一批三个是1秒,第二批两个是1秒)是我所期望的。安慰:
2018-11-12 11:54:01: preparing 1 2
2018-11-12 11:54:01: preparing 1 3
2018-11-12 11:54:01: preparing 1 1
2018-11-12 11:54:01: preparing 1 4
2018-11-12 11:54:01: preparing 1 5
2018-11-12 11:54:01: preparing 2 1
2018-11-12 11:54:01: preparing 2 2
2018-11-12 11:54:01: preparing 2 3
2018-11-12 11:54:01: preparing 2 4
2018-11-12 11:54:01: preparing 2 5
2018-11-12 11:54:01: preparing 3 1
2018-11-12 11:54:01: preparing 3 2
2018-11-12 11:54:01: func2 3 1
2018-11-12 11:54:01: preparing 3 3
2018-11-12 11:54:01: func2 3 2
2018-11-12 11:54:01: func2 3 3
2018-11-12 11:54:02: preparing 3 4
2018-11-12 11:54:02: preparing 3 5
2018-11-12 11:54:02: func2 3 4
2018-11-12 11:54:02: func2 3 5
您可以看到它func2
被正确调用了五次。不幸的是,您看到循环内部有很多“旋转”。当然,它实际上是一个无操作(如 2.05 秒运行时所证明的那样),因此节点上的负载可以忽略不计。
如果有人有办法阻止这种不必要的旋转,我欢迎发表评论或“竞争”答案。
推荐阅读
- php - 为什么提交后输入数据没有发送到控制器?[LARAVEL]
- laravel-envoy - 在特使脚本控制台命令中引发错误
- git - 当前的 git 分支是领先还是落后?
- gridfs - 获取 GridFS 中对象的 ID
- r - 为什么我的时间序列中没有正确的变量?
- c - 用作数组索引时的规范无符号文字后缀
- vba - 循环直到找不到搜索词
- ios - 将日期转换为格林尼治标准时间日期日期月份年份 Swift
- python-3.x - 尝试使用 Google Translate API 时,我不断收到 SSL 错误
- powershell - 为什么带有 -NewerThan 的 Test-Path 不能在这里工作