首页 > 解决方案 > Julia 使用带有 @sync/@async 的 Distributed 和 SharedArrays 进行许多分配

问题描述

我试图了解如何将包 Distributed 与 SharedArrays 一起使用来与 julia 一起执行并行操作。作为一个例子,我采用了一个简单的蒙特卡罗平均方法

using Distributed 
using SharedArrays
using Statistics

const NWorkers = 2
const Ns = Int(1e6)


function parallelRun()

    addprocs(NWorkers)
    procsID = workers()

    A = SharedArray{Float64,1}(Ns)
    println("starting loop")

    for i=1:2:Ns
        #parallel block
        @sync for p=1:NWorkers
                @async A[i+p-1] = remotecall_fetch(rand,procsID[p]);
        end
    end

    println(mean(A))
end


function singleRun()
    A = zeros(Ns)
    for i=1:Ns
        A[i] = rand()
    end
 
    println(mean(A))
end

但是,如果我 @time 两个函数我都会得到

julia> @time singleRun()
0.49965531193003165
  0.009762 seconds (17 allocations: 7.630 MiB)
julia> @time parallelRun()
0.4994892300029917
 46.319737 seconds (66.99 M allocations: 2.665 GiB, 1.01% gc time)

特别是在并行版本中有更多的分配,这使得代码慢得多。

我错过了什么吗?

顺便说一句,我使用 @sync 和 @async 的原因(即使在这个框架中不需要,因为每个样本都可以按随机顺序计算)只是因为我想应用相同的策略来数值求解抛物线 PDE线上的东西

    for t=1:time_steps

        #parallel block
        @sync for p=1:NWorkers
                @async remotecall(make_step_PDE,procsID[p],p);
        end
    end

由 p 索引的每个工人都应该在我的方程的一组不相交的索引上工作。

提前致谢

标签: parallel-processingjulia

解决方案


您的代码存在以下问题:

  • 您正在为 a 的每个值生成一个远程任务,i这很昂贵,最终需要很长时间。基本上,经验法则是使用@distributed宏来平衡工作人员之间的负载,这将平均分担工作。
  • 永远不要放入addprocs你的工作函数中,因为每次运行它,每次添加新进程时——生成一个新的 Julia 进程也需要大量时间,这包括在你的测量中。实际上,这意味着您希望addprocs在执行初始化的脚本的某些部分运行,或者可能通过julia使用-p --machine-file参数启动进程来添加进程
  • 最后,总是运行@time两次——第一次测量@time也是测量编译时间,在分布式环境中编译比在单个进程中花费的时间要长得多。

你的函数应该或多或少像这样

using Distributed, SharedArrays
addprocs(4)
@everywhere using Distributed, SharedArrays
function parallelRun(Ns)
    A = SharedArray{Float64,1}(Ns)
    @sync @distributed for i=1:Ns
         A[i] = rand();
    end
    println(mean(A))
end

您还可以考虑在工作人员之间完全拆分数据。在某些情况下,这不太容易出现错误,并允许您分布在许多节点上:

using Distributed, DistributedArrays
addprocs(4)
@everywhere using Distributed, DistributedArrays
function parallelRun2(Ns)
    d = dzeros(Ns) #creates an array distributed evenly on all workers
    @sync @distributed for i in 1:Ns
        p = localpart(d)
        p[((i-1) % Int(Ns/nworkers())+1] = rand()
    end
    println(mean(d))
end

推荐阅读