python - 使用带有回调函数的 python pool.apply_async() 的最佳实践
问题描述
对于pool.apply_async()
累积来自每个过程的结果的最佳实践是什么?是job.get()
还是job.wait()
?)job.ready(
和?job.successful()
是否可以在每个进程中将每个结果累积到一个全局变量中,这样我们就不会长时间处于 S(睡眠)模式的一个进程试图累积来自每个进程的结果?
import multiprocessing
import os
import numpy as np
def prepare_data_fill_arrays(simNum,chrLong):
print('prepare_data_fill_arrays worker id:%s simNum:%d %s' %(str(os.getpid()),simNum,chrLong))
arrayList=[]
array1=np.ones((1,10))*simNum
array2=np.ones((1,10))*simNum
arrayList.append(simNum)
arrayList.append(chrLong)
arrayList.append(array1)
arrayList.append(array2)
return arrayList
if __name__ == '__main__':
numofSimulations = 10
chromNamesList = ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chrX', 'chr8', 'chr9', 'chr10', 'chr11',
'chr12', 'chr13', 'chr14', 'chr15', 'chr16', 'chr17', 'chr18', 'chr20', 'chrY', 'chr19', 'chr22',
'chr21', 'chrM']
sim_nums = range(0, numofSimulations + 1)
sim_num_chr_tuples = ((sim_num, chrLong) for sim_num in sim_nums for chrLong in chromNamesList)
jobs=[]
accumulatedArray1=np.zeros((numofSimulations,10))
accumulatedArray2=np.zeros((numofSimulations,10))
def accumulateArray(arrayList):
try:
simNum=arrayList[0]
chrLong=arrayList[1]
array1 = arrayList[2]
array2 = arrayList[3]
accumulatedArray1[simNum-1]=array1
accumulatedArray2[simNum-1]=array2
print('ACCUMULATION simNum:%d chrLong:%s' %(simNum,chrLong))
except Exception as e:
print("Exception: %s" %(e))
with multiprocessing.Pool() as pool:
for simNum, chrLong in sim_num_chr_tuples:
jobs.append(pool.apply_async(prepare_data_fill_arrays,
args=(simNum,chrLong,),
callback=accumulateArray))
for job in jobs:
job.get()
print("accumulatedArray1:%s" %(accumulatedArray1))
print("accumulatedArray2:%s" %(accumulatedArray2))
解决方案
推荐阅读
- python - 为什么我的 python 代码显示值错误?
- express - peerjs 在 loaclhost 上工作,但不在 heroku 上?
- firebase - 没有为“AuthService”类型定义方法“UserUpdateInfo”
- c - 实现try-catch-finally语法时如何解决bison shift/reduce?
- amazon-web-services - Next.js:当托管在 AWS Cloudfront 上时,如何使链接与导出的站点一起使用?
- prometheus - 如何加入两个矩阵,然后在 Prometheus 中找到速率
- r - 日期格式错误来源供应是否有 R 功能
- javascript - TypeError:不能混合 BigInt 和其他类型,使用显式转换
- c++ - 如何在不使用任何同步方法的情况下同步线程?
- java - 搜索带下划线的字符串