首页 > 解决方案 > python - 如何通过pool.imap()函数传递数据以外的参数以在python中进行多处理?

问题描述

我正在研究高光谱图像。为了减少图像中的噪声,我使用 pywt 包进行小波变换。当我正常执行此操作(串行处理)时,它工作顺利。但是当我尝试使用多核对图像进行小波变换来实现并行处理时,我必须传递某些参数,例如

  1. 小波族
  2. 阈值
  3. 阈值技术(硬/软)

但是我无法使用池对象传递这些参数,当我使用 pool.imap() 时,我只能将数据作为参数传递。但是当我使用 pool.apply_async() 时,它需要更多的时间,而且输出的顺序也不一样。我在这里添加代码以供参考:

import matplotlib.pyplot as plt
import numpy as np
import multiprocessing as mp
import os
import time
from math import log10, sqrt
import pywt
import tifffile

def spec_trans(d,wav_fam,threshold_val,thresh_type):
  
  data=np.array(d,dtype=np.float64)
  data_dec=decomposition(data,wav_fam)
  data_t=thresholding(data_dec,threshold_val,thresh_type)
  data_rec=reconstruction(data_t,wav_fam)
  
  return data_rec

if __name__ == '__main__':
    
    #input
    X=tifffile.imread('data/Classification/university.tif')
    #take paramaters
    threshold_val=float(input("Enter the value for image thresholding: "))
    print("The available wavelet functions:",pywt.wavelist())
    wav_fam=input("Choose a wavelet function for transformation: ")
    threshold_type=['hard','soft']
    print("The available wavelet functions:",threshold_type)
    thresh_type=input("Choose a type for threshholding technique: ")

    start=time.time()
    p = mp.Pool(4)
    jobs=[]
    for dataBand in xmp:
      jobs.append(p.apply_async(spec_trans,args=(dataBand,wav_fam,threshold_val,thresh_type)))
    transformedX=[]
    for jobBit in jobs:
      transformedX.append(jobBit.get())
    end=time.time()
    p.close()


此外,当我使用“软”技术进行阈值处理时,我面临以下错误:

C:\Users\Sawon\anaconda3\lib\site-packages\pywt\_thresholding.py:25: RuntimeWarning: invalid value encountered in multiply
  thresholded = data * thresholded

串行执行和并行执行的结果或多或少是相同的。但在这里我得到的结果略有不同。任何修改代码的建议都会有所帮助谢谢

标签: pythonimagenumpymultiprocessingpool

解决方案


[这不是问题的直接答案,而是比通过小评论框尝试以下更清晰的后续查询]

作为快速检查,将迭代器计数器传递给 spec_trans 并将其返回(以及您的结果) - 并将其推送到单独的列表中,transformXseq 或其他东西 - 然后与您的输入序列进行比较。IE

def spec_trans(d,wav_fam,threshold_val,thresh_type, iCount):

    data=np.array(d,dtype=np.float64)
    data_dec=decomposition(data,wav_fam)
    data_t=thresholding(data_dec,threshold_val,thresh_type)
    data_rec=reconstruction(data_t,wav_fam)

    return data_rec, iCount

然后在 main

jobs=[]
iJobs = 0
for dataBand in xmp:
   jobs.append(p.apply_async(spec_trans,args=(dataBand,wav_fam,threshold_val,thresh_type, iJobs)))
   iJobs = iJobs + 1 

transformedX=[]
transformedXseq=[]
for jobBit in jobs:
    res = jobBit.get()
    transformedX.append(res[0])
    transformedXseq.append(res[1])

...并检查列表 transformXseq 以查看您是否按照提交的顺序收集了作业。它应该匹配!


推荐阅读