python - 如何使用多线程加速嵌套for循环计算?
问题描述
我正在尝试对大型数组执行数值积分,并且计算需要很长时间。我尝试使用 numba 和 jit 装饰器来加速我的代码,但不支持 numpy.trapz。
我的新想法是创建 n 个线程来并行运行计算,但我想知道如何做到这一点,或者它是否可行?
引用下面的代码
我可以让 sz[2] 多个线程同时运行,调用 ZO_SteadState 来计算值吗?
for i in range(sz[1]):
phii = phi[i]
for j in range(sz[2]):
s = tau[0, i, j, :].reshape(1, n4)
[R3, PHI3, S3] = meshgrid(rprime, phiprime, s)
BCoeff = Bessel0(bm * R3)
SS[0, i, j] = ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v)
有问题的计算。
@jit()
def ZO_SteadyState(alpha, b,bm,BCoeff,Bessel_Denom, k2,maxt,phii, PHI2, PHI3, phiprime,R3,rprime,s,S3, T,v):
g = 1000000 * exp(-(10 ** 5) * (R3 - (b / maxt) * S3) ** 2) * (
exp(-(10 ** 5) * (PHI3 - 0) ** 2) + exp(-(10 ** 5) * (PHI3 - 2 * np.pi) ** 2) + exp(
-(10 ** 5) * (PHI3 - 2 * np.pi / 3) ** 2) + exp(
-(10 ** 5) * (PHI3 - 4 * np.pi / 3) ** 2)) # stationary point heat source.
y = R3 * ((np.sqrt(2) / b) * (1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2))))))
* (BCoeff / Bessel_Denom)) * np.cos(v * (phii - PHI3)) * g
x = (np.trapz(y, phiprime, axis=0)).reshape(1, 31, 300)
# integral transform of heat source. integral over y-axis
gbarbar = np.trapz(x, rprime, axis=1)
PHI2 = np.meshgrid(phiprime, s)[0]
sz2 = PHI2.shape
f = h2 * 37 * Array_Ones((sz2[0], sz[1])) # boundary condition.
fbar = np.trapz(np.cos(v * (phii - PHI2)) * f, phiprime, 1).reshape(1, n4) # integrate over y
A = (alpha / k) * gbarbar + ((np.sqrt(2) * alpha) / k2) * (
1 / (np.sqrt((H2 ** 2 / bm ** 2) + (1 - (v ** 2 / (bm ** 2 * b ** 2)))))) * fbar
return np.trapz(exp(-alpha * bm ** 2 * (T[0, i, j] - s)) * A, s)
解决方案
另一个概念实现,进程产生进程(编辑:jit 测试):
import numpy as np
# better pickling
import pathos
from contextlib import closing
from numba import jit
#https://stackoverflow.com/questions/47574860/python-pathos-process-pool-non-daemonic
import multiprocess.context as context
class NoDaemonProcess(context.Process):
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class NoDaemonPool(pathos.multiprocessing.Pool):
def Process(self, *args, **kwds):
return NoDaemonProcess(*args, **kwds)
# matrix dimensions
x = 100 # i
y = 500 # j
NUM_PROCESSES = 10 # total NUM_PROCESSES*NUM_PROCESSES will be spawned
SS = np.zeros([x, y], dtype=float)
@jit
def foo(i):
return (i*i + 1)
@jit
def bar(phii, j):
return phii*(j+1)
# The code which is implemented down here:
'''
for i in range(x):
phii = foo(i)
for j in range(y):
SS[i, j] = bar(phii, j)
'''
# Threaded version:
# queue is in global scope
def outer_loop(i):
phii = foo(i)
# i is in process scope
def inner_loop(j):
result = bar(phii,j)
# the data is coordinates and result
return (i, j, result)
with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
res = list(pool.imap(inner_loop, range(y)))
return res
with closing(NoDaemonPool(processes=NUM_PROCESSES)) as pool:
results = list(pool.imap(outer_loop, range(x)))
result_list = []
for r in results:
result_list += r
# read results from queue
for res in result_list:
if res:
i, j, val = res
SS[i,j] = val
# check that all cells filled
print(np.count_nonzero(SS)) # 100*500
编辑:解释。
这段代码中所有复杂性的原因是我想做比 OP 要求的更多的并行化。如果仅并行化内循环,则保留外循环,因此对于外循环的每次迭代,都会创建新的进程池并执行内循环的计算。只要在我看来,该公式不依赖于外循环的其他迭代,我决定并行化所有内容:现在外循环的计算被分配给池中的进程,之后每个“外循环”进程创建自己的新池,并产生额外的进程来执行内部循环的计算。
我可能错了,外循环不能并行化;在这种情况下,您只能保留内部进程池。
使用进程池可能不是最佳解决方案,因为创建和销毁池会消耗时间。更有效(但需要手动模式)的解决方案是一劳永逸地实例化 N 个进程,然后将数据输入它们并使用多处理 Queue() 接收结果。所以你应该首先测试这个多处理解决方案是否给你足够的加速(如果构建和销毁池的时间比运行时间短,就会发生这种情况Z0_SteadyState
)。
下一个复杂因素是人工无守护程序池。守护进程用于优雅地停止应用程序:当主程序退出时,守护进程会静默终止。但是,守护进程不能产生子进程。在您的示例中,您需要等到每个进程结束才能检索数据,因此我将它们设为非守护进程以允许生成子进程来计算内部循环。
数据交换:我认为与实际计算相比,填充矩阵所需的数据量和时间是很小的。所以我使用池和pool.imap
函数(这比 快一点.map()
。你也可以尝试.imap_unordered()
,但在你的情况下它不应该有显着差异)。因此,内部池一直等到所有结果都被计算出来并将它们作为列表返回。因此,外部池返回必须连接的列表列表。然后在单个快速循环中根据这些结果重建矩阵。
注意事项with closing()
:该语句下的事情完成后,它会自动关闭池,避免僵尸进程消耗内存。
此外,您可能会注意到我奇怪地在另一个函数中定义了一个函数,并且在进程内部我可以访问一些尚未传递到那里的变量:i
, phii
. copy-on-change
发生这种情况是因为进程可以访问使用策略(默认fork
模式)启动它们的全局范围。这不涉及酸洗并且速度很快。
最后一条评论是关于使用pathos
library 而不是 standard multiprocessing
, concurrent.futures
,subprocess
等。原因是使用pathos
了更好的 pickling 库,因此它可以序列化标准库不能序列化的函数(例如 lambda 函数)。我不知道你的功能,所以我使用了更强大的工具来避免进一步的问题。
最后一件事:多处理与线程。您可以将pathos
处理池更改为标准ThreadPoolExecutor
from concurrent.futures
,就像我刚开始该代码时所做的那样。但是,在执行过程中,在我的系统上,CPU 仅加载 100%(即使用了一个内核,看起来所有 8 个内核都加载了 15-20%)。我不太擅长理解线程和进程之间的差异,但在我看来,进程允许利用所有内核(每个内核 100%,总共 800%)。
推荐阅读
- javascript - 如何减少 web worker HTML 5 的 CPU 使用率
- python - Pandas:基于数据框的布尔列计算运行错误的干净方法?
- logging - 来自不同提供商的虚拟机的谷歌云日志无法正常工作
- laravel - livewire 中的 route /$id 无法访问数据
- apache-kafka - 是否可以在 Debezium 中禁用初始数据快照?
- sql - PostreSQL 使用带有自定义参数的命名 exec
- vue.js - Vue:在 v-for 中使用方法但 getter 尚未准备好
- visual-studio-code - Visual Studio Code - 拒绝访问 unins000.exe
- c# - c#关于多态性的错误。在程序结束时使用Get String()方法它说没有像obj.earnings()这样的方法
- android - Android:我正在尝试使用 Jobscheduler 删除图库中的图片,但无法删除它们