python - Python中一系列循环的多处理?
问题描述
我有一个非常大的数组,我需要创建超过 10^7 列,这些列需要根据某些标准进行过滤/修改。有一组 24 种不同的标准(由于组合为 2x4x3),这意味着过滤/修改需要进行 24 次,并且每个结果都保存在不同的指定目录中。
由于这需要很长时间,我正在考虑使用多处理来加速该过程。谁能帮我吗?这是一个示例代码:
import itertools
import numpy as np
sample_size = 1000000
variables = 25
x_array = np.random.rand(variables, sample_size)
x_dir = ['x1', 'x2']
y_dir = ['y1', 'y2', 'y3', 'y4']
z_dir = ['z1', 'z2', 'z3']
x_directories = [0, 1]
y_directories = [0, 1, 2, 3]
z_directories = [0, 1, 2]
directory_combinations = itertools.product(x_directories, y_directories, z_directories)
for k, t, h in directory_combinations:
target_dir=main_dir+'/'+x_dir[k]+'/'+y_dir[t]+'/'+z_dir[h]
for i in range(sample_size):
#x_array gets filtered/modified
#x_array gets saved in target_dir directory as a dataframe after modification'''
基本上对于多处理,我希望每个循环由我可用的 16 个内核中的一个内核处理,或者通过使用所有 16 个内核来加速每个循环迭代。
提前谢谢了!
解决方案
下面的代码首先x_array
在共享内存中创建,并用全局变量初始化池中的每个进程x_array
,也就是这个共享数组。
我将移动创建此 global 副本的代码,x_array
对其进行处理,然后将 写出dataframe
到函数 ,worker
该函数作为参数传递给目标目录。
import itertools
import numpy as np
import ctypes
import multiprocessing as mp
SAMPLE_SIZE = 1000000
VARIABLES = 25
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = mp.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def init_pool(shared_array, shape):
global x_array
# Recreate x_array using shared memory array:
x_array = to_numpy_array(shared_array, shape)
def worker(target_dir):
# make copy of x_array with np.copy
x_array_copy = np.copy(x_array)
for i in range(sample_size):
#x_array_copy gets filtered/modified
...
#x_array_copy gets saved in target_dir directory as a dataframe after modification
def main():
main_dir = '.' # for example
x_dir = ['x1', 'x2']
y_dir = ['y1', 'y2', 'y3', 'y4']
z_dir = ['z1', 'z2', 'z3']
x_directories = [0, 1]
y_directories = [0, 1, 2, 3]
z_directories = [0, 1, 2]
directory_combinations = itertools.product(x_directories, y_directories, z_directories)
target_dirs = [main_dir+'/'+x_dir[k]+'/'+y_dir[t]+'/'+z_dir[h] for k, t, h in directory_combinations]
x_array = np.random.rand(VARIABLES, SAMPLE_SIZE)
shape = x_array.shape
# Create array in shared memory
shared_array = to_shared_array(x_array, ctypes.c_int64)
# Recreate x_array using the shared memory array as the base:
x_array = to_numpy_array(shared_array, shape)
# Create pool of 12 processes copying the shared array to each process:
pool = mp.Pool(12, initializer=init_pool, initargs=(shared_array, shape))
pool.map(worker, target_dirs)
# This is required for Windows:
if __name__ == '__main__':
main()
推荐阅读
- hp-uft - 如果检查失败,则从 UFT 检查点获取布尔值而不抛出错误
- php - Pass variable from header.php to functions.php
- javascript - Socket.io 客户端如何知道用户是否连接到通道
- angular - TypeError:您在流中提供了“未定义”
- internet-explorer - Kerberos 身份验证仅适用于 IE -> 私人浏览器
- react-native - 在 React Native 中从 Google Places Autocomplete 获取纬度和经度
- c# - 如何保护使用 iTextSharp 签名的 pdf 文件?
- android-studio - Error related to resources_ap after upgrading to Android Studio 3.4
- google-apps-script - 如何让用户使用脚本(绑定到 Gsheet)来更新另一个 Gsheet 中的单元格(用户没有编辑权限)?
- wpf - 在桌面桥下运行时无法访问输出目录中的文件