asynchronous - 在 Julia 中异步运行两个具有不同工作线程的 pmap
问题描述
我有一个与 pmap 一起使用的函数来并行化它。我想每次使用 10 个工作人员异步运行 4 次此函数,但我不能同时运行两个或更多 pmap。
我在 linux 上使用 Julia v1.1 和 40-CPUs 机器。
using Distributed
addprocs(4)
@everywhere function TestParallel(x)
a = 0
while a < 4
println("Value = ",x, " in worker = ", myid())
sleep(1)
a += 1
end
end
a = WorkerPool([2,3])
b = WorkerPool([4,5])
c = [i for i = 1:10]
@sync @async for i in c
pmap(x-> TestParallel(x), a, c)
pmap(x-> TestParallel(x), b, c)
end
我希望有:
From worker 2: Value = 1 in worker = 2
From worker 3: Value = 2 in worker = 3
From worker 4: Value = 3 in worker = 4
From worker 5: Value = 4 in worker = 5
因此,c 的前两个元素进入第一个 pmap,接下来的两个元素进入第二个 pmap,然后谁先完成,就会得到接下来的两个元素。
现在我得到:
From worker 2: Value = 1 in worker = 2
From worker 3: Value = 2 in worker = 3
From worker 2: Value = 1 in worker = 2
From worker 3: Value = 2 in worker = 3
在第一个 pmap 完成 c 的所有元素后,第二个 pmap 重新开始求解所有元素。
From worker 2: Value = 9 in worker = 2
From worker 3: Value = 10 in worker = 3
From worker 5: Value = 2 in worker = 5
From worker 4: Value = 1 in worker = 4
解决方案
您的问题存在一些问题:使用绿色线程@sync
并且@async
您想要分配计算。语法@sync @async [some code]
异步生成代码并等待它完成。因此实际上它与 具有相同的含义[some code]
。
虽然您的问题尚不清楚,但我会假设您希望pmap
使用单独的工作池并行启动 2 s(这似乎是您最有可能尝试做的事情)。
在这种情况下,这里是代码:
using Distributed
addprocs(4)
@everywhere function testpar2(x)
for a in 0:3
println("Value = $x [$a] in worker = $(myid())")
sleep(0.2)
end
return 1000*myid()+x*x #I assume you want to return some value
end
a = WorkerPool([2,3])
b = WorkerPool([4,5])
c = collect(1:10)
@sync begin
@async begin
res1 = pmap(x-> testpar2(x), a, c)
println("Got res1=$res1")
end
@async begin
res2 = pmap(x-> testpar2(x), b, c)
println("Got res2=$res2")
end
end
运行上述代码时,您将看到如下内容:
...
From worker 5: Value = 10 [3] in worker = 5
From worker 2: Value = 10 [3] in worker = 2
From worker 3: Value = 9 [3] in worker = 3
Got res2=[4001, 5004, 5009, 4016, 5025, 4036, 4049, 5064, 4081, 5100]
Got res1=[2001, 3004, 2009, 3016, 2025, 3036, 3049, 2064, 3081, 2100]
Task (done) @0x00000000134076b0
您可以清楚地看到,两个 pmap 都在不同的工作池上并行运行。
推荐阅读
- clojure - 使用 clojure 从 api 获取数据并记录结果
- python - Python程序通过防火墙windows
- r - 如何在彼此下方的 ggplot 中将同一行的两个不同点一起添加但仍处于相同的 y 标签?
- php - Twig 'for each' 搜索结果 - 我的代码控制所有搜索结果,而它应该为每个结果运行
- javascript - 如何在javascript中复制键的值?
- spring-boot - 需要使用 Spring Boot 收听传入的电子邮件
- javascript - Phaser js:如何将播放器对象添加到我的场景中?
- google-apps-script - 在 Appscript 中复制带有条件的值
- python - Python解析API响应Json主体嵌套数据
- visual-studio-code - 如何防止 vscode 在“最近”列表中列出单个工作区?