multithreading - 朱莉娅多线程不能扩展令人尴尬的并行工作
问题描述
以下代码计算从几组中获得 50 张独特卡片的平均抽奖次数。重要的是这个问题不需要太多 RAM,并且在多线程模式下启动时不共享任何变量。当使用四个以上的线程启动以执行 400,000 次模拟时,它始终比同时启动并执行 200,000 次模拟的两个进程多花费大约一秒钟的时间。这一直困扰着我,我找不到任何解释。
这是 epic_draw_multi_thread.jl 中的 Julia 代码:
using Random
using Printf
import Base.Threads.@spawn
function pickone(dist)
n = length(dist)
i = 1
r = rand()
while r >= dist[i] && i<n
i+=1
end
return i
end
function init_items(type_dist, unique_elements)
return zeros(Int32, length(type_dist), maximum(unique_elements))
end
function draw(type_dist, unique_elements_dist)
item_type = pickone(type_dist)
item_number = pickone(unique_elements_dist[item_type])
return item_type, item_number
end
function draw_unique(type_dist, unique_elements_dist, items, x)
while sum(items .> 0) < x
item_type, item_number = draw(type_dist, unique_elements_dist)
items[item_type, item_number] += 1
end
return sum(items)
end
function average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
items = init_items(type_dist, unique_elements)
tot_draws = 0
for i in 1:n
tot_draws += draw_unique(type_dist, unique_elements_dist, items, x)
if reset
items .= 0
else
items[items.>1] -= 1
end
end
println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
return tot_draws / n
end
function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println("Started computing...")
t = max(Threads.nthreads() - 1, 1)
m = Int32(round(n / t))
tasks = Array{Task}(undef, t)
@sync for i in 1:t
task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m)
tasks[i] = task
end
sum(fetch(t) for t in tasks) / t
end
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)
unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]
str_n = ARGS[1]
n = parse(Int64, str_n)
avg = parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
这是在 shell 发出的在两个线程上运行的命令以及输出和计时结果:
time julia --threads 3 epic_draw_multi_thread.jl 400000
Started computing...
Started average_for_unique on thread 3 with n = 200000
Started average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 2 with n = 200000
Completed average_for_unique on thread 3 with n = 200000
70.44460749999999
real 0m14.347s
user 0m26.959s
sys 0m2.124s
这些是在 shell 发出的命令,用于运行两个进程,每个进程的作业大小为一半,以及输出和计时结果:
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
time julia --threads 1 epic_draw_multi_thread.jl 200000 &
Started computing...
Started computing...
Started average_for_unique on thread 1 with n = 200000
Started average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
Completed average_for_unique on thread 1 with n = 200000
70.434375
real 0m12.919s
user 0m12.688s
sys 0m0.300s
70.448695
real 0m12.996s
user 0m12.790s
sys 0m0.308s
无论我重复实验多少次,我总是让多线程模式变慢。笔记:
- 我创建了并行代码来近似 PI 的值,但没有遇到同样的问题。但是,我在此代码中看不到任何可能导致线程之间发生任何冲突而导致速度缓慢的内容。
- 当开始使用多个线程时,我使用线程数减一来执行绘制。如果做不到这一点,最后一个线程似乎会挂起。可以将此语句
t = max(Threads.nthreads() - 1, 1)
更改为t = Threads.nthreads()
使用可用的确切线程数。
编辑于 2020 年 11 月 20 日
实施了 Przemyslaw Szufel 的建议。这是新代码:
using Random
using Printf
import Base.Threads.@spawn
using BenchmarkTools
function pickone(dist, mt)
n = length(dist)
i = 1
r = rand(mt)
while r >= dist[i] && i<n
i+=1
end
return i
end
function init_items(type_dist, unique_elements)
return zeros(Int32, length(type_dist), maximum(unique_elements))
end
function draw(type_dist, unique_elements_dist, mt)
item_type = pickone(type_dist, mt)
item_number = pickone(unique_elements_dist[item_type], mt)
return item_type, item_number
end
function draw_unique(type_dist, unique_elements_dist, items, x, mt)
while sum(items .> 0) < x
item_type, item_number = draw(type_dist, unique_elements_dist, mt)
items[item_type, item_number] += 1
end
return sum(items)
end
function average_for_unique(type_dist, unique_elements_dist, x, n, mt, reset=true)
println(@sprintf("Started average_for_unique on thread %d with n = %d", Threads.threadid(), n))
items = init_items(type_dist, unique_elements)
tot_draws = 0
for i in 1:n
tot_draws += draw_unique(type_dist, unique_elements_dist, items, x, mt)
if reset
items .= 0
else
items[items.>1] -= 1
end
end
println(@sprintf("Completed average_for_unique on thread %d with n = %d", Threads.threadid(), n))
return tot_draws / n
end
function parallel_average_for_unique(type_dist, unique_elements_dist, x, n, reset=true)
println("Started computing...")
t = max(Threads.nthreads() - 1, 1)
mts = MersenneTwister.(1:t)
m = Int32(round(n / t))
tasks = Array{Task}(undef, t)
@sync for i in 1:t
task = @spawn average_for_unique(type_dist, unique_elements_dist, x, m, mts[i])
tasks[i] = task
end
sum(fetch(t) for t in tasks) / t
end
type_dist = [0.3, 0.3, 0.2, 0.15, 0.05]
const cum_type_dist = cumsum(type_dist)
unique_elements = [21, 27, 32, 14, 10]
unique_elements_dist = [[1 / unique_elements[j] for i in 1:unique_elements[j]] for j in 1:length(unique_elements)]
const cum_unique_elements_dist = [cumsum(dist) for dist in unique_elements_dist]
str_n = ARGS[1]
n = parse(Int64, str_n)
avg = @btime parallel_average_for_unique(cum_type_dist, cum_unique_elements_dist, 50, n)
print(avg)
更新的基准:
Threads @btime Linux Time
1 (2 processes) 9.927 s 0m44.871s
2 (1 process) 20.237 s 1m14.156s
3 (1 process) 14.302 s 1m2.114s
解决方案
这里有两个问题:
- 您没有正确衡量性能
- 在线程中生成随机数时,您应该
MersenneTwister
为每个线程设置一个单独的随机状态以获得最佳性能(否则您的随机状态将在所有线程之间共享并且需要进行同步)
目前您正在测量“Julia 开始时间”+“代码编译时间”+“运行时间”的时间。多线程代码的编译显然比单线程代码的编译需要更长的时间。启动 Julia 本身也需要一两秒钟。
你在这里有两个选择。最简单的方法是使用BenchmarkTools
@btime
宏来测量代码内的执行时间。另一种选择是将您的代码制作成一个包并通过PackageCompiler将其编译成 Julia 图像。但是,您仍将测量“Julia 开始时间”+“Julia 执行时间”
随机数状态可以创建为:
mts = MersenneTwister.(1:Threads.nthreads());
然后使用如rand(mts[Threads.threadid()])
推荐阅读
- javascript - 使用 Howler.js 创建 JavaScript 波形可视化
- python - 使用reduce组合不同数据帧之间多列的布尔运算
- c# - Xamarin Forms 从项目内的文件夹加载图像
- load - Load_history 在使用 JDBC 3.9.2 运行复制时不显示加载失败,但在使用 Snowflake 控制台运行时显示
- crystal-reports - 通过在 Crystal Reports 中添加字段来更改组计数
- html - 在不倾斜文本的情况下倾斜背景颜色
- python - 用 pandas 数据框标记 matplotlib.pyplot.scatter
- windows - 为什么 D2D1CreateFactory 在程序集中发送错误消息
- sql - 标准 SQL - STRUCT 数组中的多行
- turfjs - 如何在 turf.js 中为 <1 公里的多边形创建 10x10 网格?