python - 在 Python 中通过线程/核心/节点并行化 for 循环
问题描述
我打算在 Python 中并行化一个 for 循环,如下所示处理大型数据数组。线程/内核/节点上的并行化如何适合此代码,以及如何实现它?任何建议表示赞赏。谢谢!
所有输入都是具有以下典型大小的 NumPy 数组:
vector_data (int64): 1M x 3
matrix (float64): 0.1M x 0.1M x 3
根据帖子的答案进行编辑:
运行时性能测试表明multiprocessing
会导致显着减速以及更高的内存要求。
from timeit import timeit
from multiprocessing import Pool
import numpy as np
from numba import jit
def OP():
N = len(matrix_data)
pop_array = np.zeros((N, N))
for vector in vector_data:
vector_2 = np.dot(vector, vector)
pop_array += (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
return pop_array
def worker(vector):
vector_2 = np.dot(vector, vector)
return (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
def f1():
N = len(matrix_data)
pop_array = np.zeros((N, N))
with Pool() as pool:
results = pool.map(worker, vector_data)
for res in results:
pop_array += res
return pop_array
def f2():
N = len(matrix_data)
pop_array = np.zeros((N, N))
with Pool() as pool:
for result in pool.imap(worker, vector_data):
pop_array += result
return pop_array
jit(parallel=True)
def f3():
N = len(matrix_data)
pop_array = np.zeros((N, N))
for vector in vector_data:
vector_2 = np.dot(vector, vector)
pop_array += (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix_data, vector, axes=([2], [0]))))
return pop_array
max_vector_index = 150
vector_size = int(1E3)
matrix_size = int(1E2)
vector_shape = vector_size, 3
matrix_shape = matrix_size, matrix_size, 3
vector_data = np.random.randint(-max_vector_index, max_vector_index+1, vector_shape)
matrix_data = np.random.random(matrix_shape)
print(f'OP: {timeit(OP, number=10):.3e} sec')
print(f'f1: {timeit(f1, number=10):.3e} sec')
print(f'f2: {timeit(f2, number=10):.3e} sec')
print(f'f3: {timeit(f3, number=10):.3e} sec')
以下是样本运行的运行时间成本:
vector_size = int(1E2)
matrix_size = int(1E1)
OP: 9.527e-02 sec
f1: 2.402e+00 sec (25.21x)
f2: 2.269e+00 sec (23.82x)
f3: 3.414e-02 sec (0.36x)
OP: 43.0 MiB
f1: 41.9 MiB (0.97x)
f2: 41.9 MiB (0.97x)
vector_size = int(1E3)
matrix_size = int(1E2)
OP: 1.420e+00 sec
f1: 1.448e+01 sec (10.20x)
f2: 2.051e+01 sec (14.44x)
f3: 1.213e+00 sec (0.86x)
OP: 43.4 MiB
f1: 119.0 MiB (2.74x)
f2: 43.8 MiB (1x)
vector_size = int(1E4)
matrix_size = int(1E3)
OP: 5.116e+02 sec
f1: 8.902e+02 sec (1.74x)
f2: 6.509e+02 sec (1.27x)
OP: 73.9 MiB
f1: 76402.1 MiB (1033x)
f2: 209.7 MiB (2.84x)
解决方案
你可以使用一个. 然后,您可以使用该方法在可迭代对象上运行函数。因此,您可以首先创建要传递给工作人员的函数,以处理迭代中的每个元素:multiprocessing
Pool
map
def worker(vector):
vector_2 = np.dot(vector, vector)
return (np.exp(-vector_2) / vector_2
* np.cos(np.tensordot(matrix, vector, axes=([2], [0]))))
现在您可以创建Pool
在每个向量上运行此函数。它将返回结果列表,然后我们可以将这些结果添加到pop_array
. 像这样:
from multiprocessing import Pool
def par_fun(vector_data, matrix):
N = len(matrixA)
pop_array = np.zeros((N, N))
with Pool() as pool:
results = pool.map(worker, vector_data)
for res in results:
pop_array += res
return pop_array
另一种可能更整洁的方法是使用imap
. 从文档:
请注意,对于非常长的迭代,它可能会导致高内存使用。考虑使用带有显式块大小选项的imap()或imap_unordered()以 提高效率。
还:
chunksize参数与map() 方法使用的参数相同。对于很长的迭代,使用较大的chunksize值可以使作业完成比使用默认值快得多
1
。
所以你可以使用这个代码:
def par_fun(vector_data, matrix):
N = len(matrixA)
pop_array = np.zeros((N, N))
pool_size = None
chunksize = 1
with Pool(pool_size) as pool:
for result in pool.imap(worker, vector_data, chunksize=chunksize):
pop_array += result
return pop_array
并使用不同pool_size
的chunksize
价值观来获得最佳结果。
另一种选择是使用线程而不是进程。进程具有可能影响运行时的创建和维护开销。要将代码更改为使用线程,只需将导入更改为使用dummy
包装器:
from multiprocessing.dummy import Pool
其余代码保持不变
推荐阅读
- neo4j - 使用 Neo4jClient 对运算符进行逻辑分组
- bash - 在 bash 中组织用户名、密码和其他选项的更好方法
- python - Python追加表演搞笑
- c# - 如何在winforms中制作具有重叠点的甘特图
- typescript - 如何根据函数参数数组返回类型化对象?
- cmake - 什么是 cmake 中介子的子项目/包装文件的等价物
- electron - 有没有办法设置电子简单更新器的“禁用=假”
- java - 在 Java 中找不到适合填充 int 的方法
- ios - 使用 swift 5 从远程读取 json 内容并在 Xcode 的主界面上打印
- amazon-web-services - 是否可以将 EasyMock/JMock 用于 AWS Java SDK 模拟和代码覆盖?