首页 > 解决方案 > 在 Python 中通过线程/核心/节点并行化 for 循环

问题描述

我打算在 Python 中并行化一个 for 循环,如下所示处理大型数据数组。线程/内核/节点上的并行化如何适合此代码,以及如何实现它?任何建议表示赞赏。谢谢!

所有输入都是具有以下典型大小的 NumPy 数组:

vector_data (int64): 1M x 3
matrix (float64): 0.1M x 0.1M x 3

根据帖子的答案进行编辑:

运行时性能测试表明multiprocessing会导致显着减速以及更高的内存要求。

from timeit import timeit
from multiprocessing import Pool

import numpy as np
from numba import jit

def OP():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))
    for vector in vector_data:
        vector_2 = np.dot(vector, vector)
        pop_array += (np.exp(-vector_2) / vector_2
                      * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
    return pop_array

def worker(vector):
    vector_2 = np.dot(vector, vector)
    return (np.exp(-vector_2) / vector_2
            * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))

def f1():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        results = pool.map(worker, vector_data)

    for res in results:
        pop_array += res

    return pop_array

def f2():
    N = len(matrix_data)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        for result in pool.imap(worker, vector_data):
            pop_array += result

    return pop_array

jit(parallel=True)
def f3():
    N = len(matrix_data)
    pop_array = np.zeros((N, N)) 
    for vector in vector_data:
        vector_2 = np.dot(vector, vector)
        pop_array += (np.exp(-vector_2) / vector_2
                      * np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
    return pop_array

max_vector_index = 150
vector_size = int(1E3)
matrix_size = int(1E2)

vector_shape = vector_size, 3
matrix_shape = matrix_size, matrix_size, 3

vector_data = np.random.randint(-max_vector_index, max_vector_index+1, vector_shape)
matrix_data = np.random.random(matrix_shape)

print(f'OP: {timeit(OP, number=10):.3e} sec')
print(f'f1: {timeit(f1, number=10):.3e} sec')
print(f'f2: {timeit(f2, number=10):.3e} sec')
print(f'f3: {timeit(f3, number=10):.3e} sec')

以下是样本运行的运行时间成本:

vector_size = int(1E2)
matrix_size = int(1E1)

OP: 9.527e-02 sec
f1: 2.402e+00 sec (25.21x)
f2: 2.269e+00 sec (23.82x)
f3: 3.414e-02 sec (0.36x)

OP: 43.0 MiB
f1: 41.9 MiB (0.97x)
f2: 41.9 MiB (0.97x)
vector_size = int(1E3)
matrix_size = int(1E2)

OP: 1.420e+00 sec
f1: 1.448e+01 sec (10.20x)
f2: 2.051e+01 sec (14.44x)
f3: 1.213e+00 sec (0.86x)

OP: 43.4 MiB
f1: 119.0 MiB (2.74x)
f2: 43.8 MiB (1x)
vector_size = int(1E4)
matrix_size = int(1E3)

OP: 5.116e+02 sec
f1: 8.902e+02 sec (1.74x)
f2: 6.509e+02 sec (1.27x)

OP: 73.9 MiB
f1: 76402.1 MiB (1033x)
f2: 209.7 MiB (2.84x)

标签: pythonpython-3.xfor-loopparallel-processingmpi4py

解决方案


你可以使用一个. 然后,您可以使用该方法在可迭代对象上运行函数。因此,您可以首先创建要传递给工作人员的函数,以处理迭代中的每个元素:multiprocessing Poolmap

def worker(vector):
    vector_2 = np.dot(vector, vector)
    return (np.exp(-vector_2) / vector_2
            * np.cos(np.tensordot(matrix, vector, axes=([2], [0]))))

现在您可以创建Pool在每个向量上运行此函数。它将返回结果列表,然后我们可以将这些结果添加到pop_array. 像这样:

from multiprocessing import Pool

def par_fun(vector_data, matrix):
    N = len(matrixA)
    pop_array = np.zeros((N, N))

    with Pool() as pool:
        results = pool.map(worker, vector_data)

    for res in results:
        pop_array += res

    return pop_array

另一种可能更整洁的方法是使用imap. 从文档:

请注意,对于非常长的迭代,它可能会导致高内存使用。考虑使用带有显式块大小选项的imap()imap_unordered()以 提高效率。

还:

chunksize参数与map() 方法使用的参数相同。对于很长的迭代,使用较大的chunksize值可以使作业完成比使用默认值快得多1

所以你可以使用这个代码:

def par_fun(vector_data, matrix):
    N = len(matrixA)
    pop_array = np.zeros((N, N))

    pool_size = None
    chunksize = 1

    with Pool(pool_size) as pool:
        for result in pool.imap(worker, vector_data, chunksize=chunksize):
            pop_array += result

    return pop_array

并使用不同pool_sizechunksize价值观来获得最佳结果。


另一种选择是使用线程而不是进程。进程具有可能影响运行时的创建和维护开销。要将代码更改为使用线程,只需将导入更改为使用dummy包装器:

from multiprocessing.dummy import Pool

其余代码保持不变


推荐阅读