首页 > 解决方案 > Python 多处理 - 读取/写入多个 2D 数组

问题描述

我试图通过将本教程应用到我的用例来加快我对大型numpy数组的繁重计算。原则上,我有一个输入数组和一个结果数组,并希望在许多进程中共享它们,在这些进程中,从输入数组中读取数据,进行调整,然后写入输出数组。我认为我不需要锁,因为用于读取和写入的数组索引对于每个进程都是唯一的。

这是我的测试示例,基于链接的教程:

import numpy as np
import multiprocessing as mp

WORKER_DICT = dict()

def shared_array_from_np_array(data_array, init_value=None):
    raw_array = mp.RawArray(np.ctypeslib.as_ctypes_type(data_array.dtype), data_array.size)
    shared_array = np.frombuffer(raw_array, dtype=data_array.dtype).reshape(data_array.shape)
    if init_value:
        np.copyto(shared_array, np.full_like(data_array, init_value))
        return raw_array, shared_array
    else:
        np.copyto(shared_array, data_array)
        return raw_array, shared_array

def init_worker(data_array, result_array):
    WORKER_DICT['data_array'] = data_array
    WORKER_DICT['result_array'] = result_array
    WORKER_DICT['shape'] = data_array.shape

def worker(i, j):
    data = np.frombuffer(WORKER_DICT['data_array']).reshape(WORKER_DICT['shape'])
    result = np.frombuffer(WORKER_DICT['worker_array']).reshape(WORKER_DICT['shape'])
    result[i, j] = np.multiply(data[i, j], 2)
    return

if __name__ == '__main__':
    sh_in_arr, shared_input_array = shared_array_from_np_array(np.array(
        [[1, 1, 2, 2],
         [1, 1, 2, 2],
         [3, 3, 4, 4],
         [3, 3, 4, 4]]))
    sh_res_arr, shared_result_array = shared_array_from_np_array(shared_input_array, 0)
    init_args = (sh_in_arr, sh_res_arr)
    with mp.Pool(processes=2, initializer=init_worker, initargs=init_args) as pool:
        pool.map_async(worker, range(shared_input_array.shape[0]))
    print('Input:', shared_input_array)
    print('Output:', shared_result_array)

当我运行它时,我再次得到相同的数组:

Input: 
[[1 1 2 2]
 [1 1 2 2]
 [3 3 4 4]
 [3 3 4 4]]
Output: 
[[1 1 2 2]
 [1 1 2 2]
 [3 3 4 4]
 [3 3 4 4]]

我什至走在正确的轨道上还是有什么大错特错?在 Python 多处理中将 Pool.map 与共享内存数组结合起来看起来更容易,但我什至不明白原来的问题。

编辑:在对旧 Python 版本发表评论后,我将其切换到 Python 3.9 并添加了实际结果。

标签: arrayspython-3.xnumpymultiprocessing

解决方案


因此,经过多次失败和无数次尝试,我想出了这个,这似乎有效:

import time
import numpy as np
from multiprocessing import Pool, RawArray, cpu_count

# A global dictionary storing the variables passed from the initializer.
GLOBAL_DICT = {}

def init_worker(input_array, array_shape):
    # Using a dictionary is not strictly necessary. You can also
    # use global variables.
    GLOBAL_DICT['input_array'] = input_array
    GLOBAL_DICT['array_shape'] = array_shape

def worker_func(i):
    # Simply doubles all entries
    data = np.frombuffer(GLOBAL_DICT['input_array']).reshape(GLOBAL_DICT['array_shape'])
    return data[i, :] * 2

# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':
    start = time.time()
    my_array = np.array(
        [[1, 1, 2, 2],
         [1, 1, 2, 2],
         [3, 3, 4, 4],
         [3, 3, 4, 4]])
    array_shape = my_array.shape
    raw_array = RawArray('d', array_shape[0] * array_shape[1])
    # Wrap my_array as an numpy array.
    shared_array = np.frombuffer(raw_array).reshape(array_shape)
    # Copy my_array to shared array.
    np.copyto(shared_array, my_array)
    # Start the process pool and do the computation.
    args = (raw_array, array_shape)
    with Pool(processes=cpu_count()-2, initializer=init_worker, initargs=args) as pool:
        result = pool.map(worker_func, range(array_shape[0]))
        print(f'Input:\n{my_array}')
        print(f'Result:\n{np.array(result)}')

印刷:

Input:
[[1 1 2 2]
 [1 1 2 2]
 [3 3 4 4]
 [3 3 4 4]]
Result:
[[2. 2. 4. 4.]
 [2. 2. 4. 4.]
 [6. 6. 8. 8.]
 [6. 6. 8. 8.]]

我想有更有效或更漂亮的方法可以做到这一点。理想情况下,我想直接写入共享输出数组,但现在它可以工作。


推荐阅读