首页 > 解决方案 > 如果可用,使多处理池使用免费内核

问题描述

我有以下一段代码,它使用一个工作池来执行一些操作。

def my_func( args ):
    
    low_index = args[0][0]
    up_index  = args[0][1]
    
    params    = args[1][0]
    A         = args[1][1]
    B         = args[1][2]
    
    print( "PID:", mp.current_process() )    
        
    for k in range( low_index, up_index ):
        
        a = params[k]
        # what if np.dot uses multi-threading?
        A = a*A + ( np.dot( A , B ) )*( np.dot( B, B ) )
        B = a*B + ( np.dot( B , A ) )*( np.dot( A, A ) )
    
    return A,B

if __name__ == '__main__':     
        
    ts = time()       
            
    import numpy as np
    params                 = np.linspace( 1, 10, 1000 )
    
    n_dim                  = 1000
    
    # the arrays A,B get modified with each call to the worker
    A                      = np.random.rand( n_dim, n_dim )
    B                      = np.random.rand( n_dim, n_dim )
    
    C                      = np.random.rand( 5*n_dim, 5*n_dim )
    D                      = np.random.rand( 5*n_dim, 5*n_dim )

    ncpus                  = psutil.cpu_count( logical=False )
    number_processes       = ncpus - 1
    total_items            = params.shape[0]  
    n_chunck               = int( ( total_items )/number_processes )
    intervals              = [ [ k*n_chunck, (k+1)*n_chunck ] for k in range( number_processes )   ]
    intervals[ -1 ][ -1 ]  = total_items          
   
    from itertools import repeat    
    objs_  = list( repeat( ( params,
                             copy.deepcopy( A  ), 
                             copy.deepcopy( B  ), 
                           ) , number_processes - 1 ) )
    objs_.append( ( params,
                 copy.deepcopy( C  ), 
                 copy.deepcopy( D  ), 
               )  )
    args_l = []
    
    for k in range( number_processes ):
        args_l.append( [ intervals[k] , objs_[k] ]  )
                
    
    pool        = mp.Pool( processes = ncpus )
    results     = pool.map( my_func, args_l  )
    
    pool.close()  
    pool.join()
    
    print( time() - ts )

最后一个过程(涉及 C 和 D 数组)将比其他过程花费更长的时间;因此,我希望在完成其他进程后,剩余进程(阵列尺寸更大的进程)可以有效利用所有可用的空闲内核。但是,我观察到最后一个进程的 CPU 使用率保持在 20% 左右(在我的机器中,我使用 6 个内核中的 5 个),因此在其余操作中效率非常低。有什么好的方法可以解决吗?

标签: python-3.xnumpymultiprocessing

解决方案


推荐阅读