首页 > 解决方案 > 使用带有回调函数的 python pool.apply_async() 的最佳实践

问题描述

对于pool.apply_async()累积来自每个过程的结果的最佳实践是什么?是job.get()还是job.wait()?)job.ready(和?job.successful()

是否可以在每个进程中将每个结果累积到一个全局变量中,这样我们就不会长时间处于 S(睡眠)模式的一个进程试图累积来自每个进程的结果?

import multiprocessing
import os
import numpy as np

def prepare_data_fill_arrays(simNum,chrLong):
    print('prepare_data_fill_arrays worker id:%s simNum:%d %s' %(str(os.getpid()),simNum,chrLong))
    arrayList=[]
    array1=np.ones((1,10))*simNum
    array2=np.ones((1,10))*simNum
    arrayList.append(simNum)
    arrayList.append(chrLong)
    arrayList.append(array1)
    arrayList.append(array2)
    return arrayList

if __name__ == '__main__':
    numofSimulations = 10
    chromNamesList = ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chrX', 'chr8', 'chr9', 'chr10', 'chr11',
                      'chr12', 'chr13', 'chr14', 'chr15', 'chr16', 'chr17', 'chr18', 'chr20', 'chrY', 'chr19', 'chr22',
                      'chr21', 'chrM']

    sim_nums = range(0, numofSimulations + 1)
    sim_num_chr_tuples = ((sim_num, chrLong) for sim_num in sim_nums for chrLong in chromNamesList)

    jobs=[]
    accumulatedArray1=np.zeros((numofSimulations,10))
    accumulatedArray2=np.zeros((numofSimulations,10))

    def accumulateArray(arrayList):
        try:
            simNum=arrayList[0]
            chrLong=arrayList[1]
            array1 = arrayList[2]
            array2 = arrayList[3]
            accumulatedArray1[simNum-1]=array1
            accumulatedArray2[simNum-1]=array2
            print('ACCUMULATION simNum:%d chrLong:%s' %(simNum,chrLong))
        except Exception as e:
            print("Exception: %s" %(e))

    with multiprocessing.Pool() as pool:
        for simNum, chrLong in sim_num_chr_tuples:
            jobs.append(pool.apply_async(prepare_data_fill_arrays,
                                 args=(simNum,chrLong,),
                                 callback=accumulateArray))
        for job in jobs:
            job.get()

    print("accumulatedArray1:%s" %(accumulatedArray1))
    print("accumulatedArray2:%s" %(accumulatedArray2))

标签: pythonmultiprocessingpool

解决方案


推荐阅读