首页 > 解决方案 > Python中一系列循环的多处理?

问题描述

我有一个非常大的数组,我需要创建超过 10^7 列,这些列需要根据某些标准进行过滤/修改。有一组 24 种不同的标准(由于组合为 2x4x3),这意味着过滤/修改需要进行 24 次,并且每个结果都保存在不同的指定目录中。

由于这需要很长时间,我正在考虑使用多处理来加速该过程。谁能帮我吗?这是一个示例代码:

import itertools
import numpy as np

sample_size = 1000000
variables = 25
x_array = np.random.rand(variables, sample_size)
      
x_dir = ['x1', 'x2']
y_dir = ['y1', 'y2', 'y3', 'y4']  
z_dir = ['z1', 'z2', 'z3']

x_directories = [0, 1]
y_directories = [0, 1, 2, 3]
z_directories = [0, 1, 2]

directory_combinations = itertools.product(x_directories, y_directories, z_directories)

for k, t, h in directory_combinations:

    target_dir=main_dir+'/'+x_dir[k]+'/'+y_dir[t]+'/'+z_dir[h]

    for i in range(sample_size):

        #x_array gets filtered/modified 
    
    #x_array gets saved in target_dir directory as a dataframe after modification'''

基本上对于多处理,我希望每个循环由我可用的 16 个内核中的一个内核处理,或者通过使用所有 16 个内核来加速每个循环迭代。

提前谢谢了!

标签: pythonmultithreadingmultiprocessing

解决方案


下面的代码首先x_array在共享内存中创建,并用全局变量初始化池中的每个进程x_array,也就是这个共享数组。

我将移动创建此 global 副本的代码,x_array对其进行处理,然后将 写出dataframe到函数 ,worker该函数作为参数传递给目标目录。

import itertools
import numpy as np
import ctypes
import multiprocessing as mp

SAMPLE_SIZE = 1000000
VARIABLES = 25

def to_numpy_array(shared_array, shape):
    '''Create a numpy array backed by a shared memory Array.'''
    arr = np.ctypeslib.as_array(shared_array)
    return arr.reshape(shape)

def to_shared_array(arr, ctype):
    shared_array = mp.Array(ctype, arr.size, lock=False)
    temp = np.frombuffer(shared_array, dtype=arr.dtype)
    temp[:] = arr.flatten(order='C')
    return shared_array

def init_pool(shared_array, shape):
    global x_array
    # Recreate x_array using shared memory array:
    x_array = to_numpy_array(shared_array, shape)

def worker(target_dir):
    # make copy of x_array with np.copy
    x_array_copy = np.copy(x_array)
    for i in range(sample_size):
        #x_array_copy gets filtered/modified
        ...

    #x_array_copy gets saved in target_dir directory as a dataframe after modification

def main(): 
    main_dir = '.' # for example

    x_dir = ['x1', 'x2']
    y_dir = ['y1', 'y2', 'y3', 'y4']
    z_dir = ['z1', 'z2', 'z3']

    x_directories = [0, 1]
    y_directories = [0, 1, 2, 3]
    z_directories = [0, 1, 2]

    directory_combinations = itertools.product(x_directories, y_directories, z_directories)
    target_dirs =  [main_dir+'/'+x_dir[k]+'/'+y_dir[t]+'/'+z_dir[h] for k, t, h in directory_combinations]

    x_array = np.random.rand(VARIABLES, SAMPLE_SIZE)
    shape = x_array.shape
    # Create array in shared memory
    shared_array = to_shared_array(x_array, ctypes.c_int64)
    # Recreate x_array using the shared memory array as the base:
    x_array = to_numpy_array(shared_array, shape)

    # Create pool of 12 processes copying the shared array to each process:
    pool = mp.Pool(12, initializer=init_pool, initargs=(shared_array, shape))

    pool.map(worker, target_dirs)

# This is required for Windows:
if __name__ == '__main__':
    main()

推荐阅读