python-3.x - 避免 Python 多处理中的竞争条件
问题描述
我有一段代码要为一组参数并行化。但是,我的工作函数广泛使用了 Numpy 和 Scipy 函数,众所周知,它们可以为某些操作解锁 GIL。我担心在使用多线程例程的这些情况下,使用 MP 模块可能会产生一些竞争条件。这是我正在尝试做的示例代码:
import numpy as np
import multiprocessing as mp
import psutil
import copy
def my_func( args ):
low_index = args[0][0]
up_index = args[0][1]
params = args[1][0]
A = args[1][1]
B = args[1][2]
print( "PID:", mp.current_process() )
for k in range( low_index, up_index ):
a = params[k]
# what if np.dot uses multi-threading?
A = a*A + ( np.dot( A , B ) )/( np.dot( B, B ) )
B = a*B + ( np.dot( B , A ) )/( np.dot( A, A ) )
return A,B
if __name__ == '__main__':
params = np.linspace( 1, 10, 100 )
n_dim = 1000
# the arrays A,B get modified with each call to the worker
A = np.random.rand( n_dim, n_dim )
B = np.random.rand( n_dim, n_dim )
ncpus = psutil.cpu_count( logical=False )
number_processes = ncpus - 1
total_items = params.shape[0]
n_chunck = int( ( total_items )/number_processes )
intervals = [ [ k*n_chunck, (k+1)*n_chunck ] for k in range( number_processes ) ]
intervals[ -1 ][ -1 ] = total_items
from itertools import repeat
objs_ = list( repeat( ( params,
copy.deepcopy( A ),
copy.deepcopy( B ),
) , number_processes ) )
args_l = []
for k in range( number_processes ):
args_l.append( [ intervals[k] , objs_[k] ] )
pool = mp.Pool( processes = ncpus )
results = pool.map( my_func, args_l )
pool.close()
pool.join()
所以我想将工作分配到机器不同核心的“params”数组上。然而,我真正的“my_func”比这个复杂得多,并且大量使用了 Numpy/Scipy 例程。我注意到实际上,按内核总数分隔作业实际上会给每个任务带来很大的执行时间,那么这与多线程干扰 MP 模块或其他什么有关吗?此外,由于每个工人的 A、B 都发生了变化,因此整个“fork”进程应该是“spawn”,因为子进程正在修改数据(如果我正确理解“fork”的作用);因此需要 RAM 中的独立副本。
解决方案
推荐阅读
- javascript - Quill js 在编辑器中放置嵌入或 html 内容
- r - 基于R中多元回归中的变量从lm()中提取R2列表
- java - 以多线程方式浏览 Jms Queue
- git - 通过源树提交时如何停止打开流失败错误
- python - tkinter.Toplevel 大小自动适合其小部件
- ms-access - 基于在一个文本字段中具有多个条件的表单进行查询
- java - 使用 dnsjava 使用 java 查询?
- javascript - 当原始请求是 POST 方法时,身份验证刷新令牌不起作用
- node.js - 从 docker 容器内发布 NPM 包
- vba - 如何在启动宏时将浏览文件夹路径设置为最近使用的目录?