首页 > 解决方案 > 避免 Python 多处理中的竞争条件

问题描述

我有一段代码要为一组参数并行化。但是,我的工作函数广泛使用了 Numpy 和 Scipy 函数,众所周知,它们可以为某些操作解锁 GIL。我担心在使用多线程例程的这些情况下,使用 MP 模块可能会产生一些竞争条件。这是我正在尝试做的示例代码:

import numpy as np
import multiprocessing as mp
import psutil
import copy

def my_func( args ):
    
    low_index = args[0][0]
    up_index  = args[0][1]
    
    params    = args[1][0]
    A         = args[1][1]
    B         = args[1][2]
    
    print( "PID:", mp.current_process() )
    
    for k in range( low_index, up_index ):
        
        a = params[k]
        # what if np.dot uses multi-threading?
        A = a*A + ( np.dot( A , B ) )/( np.dot( B, B ) )
        B = a*B + ( np.dot( B , A ) )/( np.dot( A, A ) )
    
    return A,B

if __name__ == '__main__': 
    
    params                 = np.linspace( 1, 10, 100 )
    
    n_dim                  = 1000
    
    # the arrays A,B get modified with each call to the worker
    A                      = np.random.rand( n_dim, n_dim )
    B                      = np.random.rand( n_dim, n_dim )

    ncpus                  = psutil.cpu_count( logical=False )
    number_processes       = ncpus - 1
    total_items            = params.shape[0]  
    n_chunck               = int( ( total_items )/number_processes )
    intervals              = [ [ k*n_chunck, (k+1)*n_chunck ] for k in range( number_processes )   ]
    intervals[ -1 ][ -1 ]  = total_items          
   
    from itertools import repeat    
    objs_  = list( repeat( ( params,
                             copy.deepcopy( A  ), 
                             copy.deepcopy( B  ), 
                           ) , number_processes ) )
    args_l = []
    
    for k in range( number_processes ):
        args_l.append( [ intervals[k] , objs_[k] ]  )
                
    
    pool        = mp.Pool( processes = ncpus )
    results     = pool.map( my_func, args_l  )
    
    pool.close()  
    pool.join()
    

所以我想将工作分配到机器不同核心的“params”数组上。然而,我真正的“my_func”比这个复杂得多,并且大量使用了 Numpy/Scipy 例程。我注意到实际上,按内核总数分隔作业实际上会给每个任务带来很大的执行时间,那么这与多线程干扰 MP 模块或其他什么有关吗?此外,由于每个工人的 A、B 都发生了变化,因此整个“fork”进程应该是“spawn”,因为子进程正在修改数据(如果我正确理解“fork”的作用);因此需要 RAM 中的独立副本。

标签: python-3.xmultiprocessing

解决方案


推荐阅读