首页 > 解决方案 > 关于使用 python 多处理从目标函数返回 numpy 数组的建议?

问题描述

我对 python 多处理完全陌生,对大量的在线资源有点不知所措,所以我有点想从这里获得更清晰的方法。我的代码如下所示:向前和向后这两个函数在计算上非常昂贵。在我的输入数据集上,每个大约需要 13 分钟。我想同时计算两个矩阵(向前和向后,参见 decode() 函数中的第 3 行和第 4 行代码)。我查看了一些在线教程,我想我可以使用 multiprocessing.process 来做到这一点。但是,我不确定如何检索 numpy 数组。我知道有诸如 Queue、Array 之类的东西,但它们的使用似乎非常有限,而且似乎不适合这里。提前致谢!'''

def forward(self, emis):
    # Given the observed haplotype, compute its forward matrix
    f = np.full((self.n1+self.n2, self.numSNP), np.nan)
    # initialization
    f[:,0] = (-math.log(self.n1+self.n2) + emis[0]).flatten()

     # fill in forward matrix
    for j in range(1, self.numSNP):
        T = self.transition(self.D[j])
        # using axis=1, logsumexp sum over each column of the transition matrix
        f[:, j] = emis[j] + logsumexp(f[:,j-1][:,np.newaxis] + T, axis=0)
    return f


#@profile
def backward(self, emis):
    # Given the observed haplotype, compute its backward matrix
    b = np.full((self.n1+self.n2, self.numSNP), np.nan)
    # initialization
    b[:, self.numSNP-1] = np.full(self.n1+self.n2, 0)

    for j in range(self.numSNP-2, -1, -1):
        T = self.transition(self.D[j+1])
        b[:,j] = logsumexp(T + emis[j+1] + b[:,j+1], axis=1)
    return b


#@profile
def decode(self, obs):
    # infer hidden state of each SNP sites in the given haplotype
    # state[j] = 0 means site j was most likely copied from population 1 
    # and state[j] = 1 means site j was most likely copies from population 2

    start = time.time()
    emis = self.emissionALL(obs)
    f = self.forward(emis)
    b = self.backward(emis)
    end= time.time()
    print(f'uncached version takes time {end-start}')
    print(f'forward probability:{logsumexp(f[:,-1])}')
    print(f'backward probability:{logsumexp(-math.log(self.n1+self.n2)+emis[0]+b[:,0])}')
    return 0

'''

标签: pythonmultiprocessingpython-multiprocessingmultiprocessing-manager

解决方案


如果您只是使用矩阵,我不确定 Array 对多处理有什么限制。它不完整,但这就是想法。

from multiprocessing.sharedctypes import RawArray

#make some empty arrays 
yourMat = RawArray('d', X_size) 
resultMat = RawArray('d', X_size) 

...
ptemp=multiprocessing.Process(target=backward, args=(yourMat ,resultMat ))
ptemp.daemon=True
ptemp.start()

...

data = np.frombuffer(yourMat, dtype=np.float64)
#do something with data
resultMat [i:j] = data 

...

#get the data
results = np.frombuffer(resultMat , dtype='i')

您可以查看这篇文章以获取完整示例:Use numpy array in shared memory for multiprocessing


推荐阅读