首页 > 解决方案 > 将函数的输出多处理到单个数组

问题描述

我已经尝试了很长时间了,但我的数组保持不变。

我的数组是TC_p_value,而我要模拟的函数是TC_stats。如果我们正常运行,代码运行良好,但模拟时间太长(大约一个小时)。因此,为了减少处理时间,我将原始数组 (1000x100) 分成 10 个 100x100 的小集合。虽然代码运行没有错误,但我总是得到相同的数组(与最初定义的相同)。我试图定义TC_p_valueglobal,以便每次运行都可以将值分配给数组的特定部分。但是,似乎我在这里做错了(因为不可能在多个处理器上模拟单个数组)或者我的编码逻辑有问题?

任何帮助是极大的赞赏。相同的代码写在下面。

import pingouin as pg # A package to do regression
TC_p_value = np.zeros((Treecover.shape[1],Treecover.shape[2])) #let this array be of size 1000 x 100

def TC_stats(grid_start):
    global TC_p_value
    for lat in tqdm(range(grid_start, grid_start+100)):
        for lon in range(Treecover.shape[2]):
            TC_p_value[lat,lon] = pg.corr(y=Treecover[:, lat,lon].values,
                                  x=np.arange(1,16,1))['p-val'].values[0]

#Multiprocessing starts here
from multiprocessing import Pool
if __name__ == '__main__':
    pool = Pool()
    grid = np.arange(0,1000,100) #Running it in a group of 100, 10 times
    pool.map(TC_stats, grid)
    pool.close()
    pool.join()

标签: pythonarraysnumpymultiprocessing

解决方案


问题是全局定义的数组不是跨进程共享的。因此,您需要使用共享内存。

import ctypes
import numpy as np
import pingouin as pg # A package to do regression

N, M = Treecover.shape[1], Treecover.shape[2]
mp_arr = mp.Array(ctypes.c_double, N * M)
TC_p_value = np.frombuffer(mp_arr.get_obj())
TC_p_value = TC_p_value.reshape((N, M))
#let this array be of size 1000 x 100

def TC_stats(grid_start):
    TC_p_value = np.frombuffer(mp_arr.get_obj())
    TC_p_value = TC_p_value.reshape((N, M))
    for lat in tqdm(range(grid_start, grid_start+100)):
        for lon in range(Treecover.shape[2]):
            TC_p_value[lat,lon] = pg.corr(y=Treecover[:, lat,lon].values,
                                  x=np.arange(1,16,1))['p-val'].values[0]

def init(shared_arr_):
    global mp_arr
    mp_arr = shared_arr_

#Multiprocessing starts here
from multiprocessing import Pool
if __name__ == '__main__':
    pool = Pool(initializer=init, initargs=(mp_arr,))
    grid = np.arange(0,1000,100) #Running it in a group of 100, 10 times
    pool.map_async(TC_stats, grid)
    pool.close()
    pool.join()

我用一些修改过的玩具示例运行了上面的代码,它工作了。

参考:在共享内存中使用 numpy 数组进行多处理


推荐阅读