首页 > 解决方案 > 无法向 concurrrent.futures.Executor.map() 发送多个参数

问题描述

我正在尝试结合这两个 SO 答案中提供的解决方案 -使用线程将数组切片成块并对每个块执行计算并将返回的数组重新组合成一个数组并将多个参数传递给 concurrent.futures.Executor.map?. 我有一个 numpy 数组,我将其分块成段,我希望将每个块发送到一个单独的线程,并与原始数组的块一起发送一个附加参数。这个额外的参数是一个常数,不会改变。performCalc 是一个函数,它将接受两个参数 - 一个是原始 numpy 数组的块,另一个是常量。

我尝试的第一个解决方案

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
    testThread()

def testThread():

    minLat = -65.76892
    maxLat =  66.23587
    minLon =  -178.81404
    maxLon =  176.2949
    latGrid = np.arange(minLat,maxLat,0.05)
    lonGrid = np.arange(minLon,maxLon,0.05)

    gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
    grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]

    n_jobs = psutil.cpu_count(logical=False)

    chunk = np.array_split(grid_points,n_jobs,axis=0)


   x = ThreadPoolExecutor(max_workers=n_jobs) 
   maxDistance = 4.3
   func = partial(performCalc,chunk)
   args = [chunk,maxDistance]
   # This prints 4.3 twice although there are four cores in the system
   results = x.map(func,args)
   # This prints 4.3 four times correctly
   results1 = x.map(performTest,chunk)

  def performCalc(chunk,maxDistance):
      print(maxDistance)
      return chunk

 def performTest(chunk):
     print("test")

 main()

因此,即使系统中的核心数为 4,performCalc() 也会打印 4.3 两次。而 performTest() 会正确打印四次 test。我无法弄清楚这个错误的原因。

此外,我确定我设置 for itertools.partial 调用的方式不正确。

1)原始numpy数组有四个块。

2) 每个块要与 maxDistance 配对并发送到 performCalc()

3) 将有四个线程打印 maxDistance 并返回总结果的一部分,这些结果将在一个数组中返回

我哪里错了?

更新

我也尝试使用 lambda 方法

results = x.map(lambda p:performCalc(*p),args)

但这什么也没打印。

标签: pythonpython-3.xconcurrencyiteratormap-function

解决方案


使用用户 mkorvas 提供的解决方案,如下所示 -如何将具有多个参数的函数传递给 python concurrent.futures.ProcessPoolExecutor.map()?我能够解决我的问题,如这里的解决方案所示 -

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
   testThread()

def testThread():

   minLat = -65.76892
   maxLat =  66.23587
   minLon =  -178.81404
   maxLon =  176.2949
   latGrid = np.arange(minLat,maxLat,0.05)
   lonGrid = np.arange(minLon,maxLon,0.05)
   print(latGrid.shape,lonGrid.shape)
   gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
   grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
   print(grid_points.shape)
   n_jobs = psutil.cpu_count(logical=False)
   chunk = np.array_split(grid_points,n_jobs,axis=0)
   x = ThreadPoolExecutor(max_workers=n_jobs) 


  maxDistance = 4.3
  func = partial(performCalc,maxDistance)

  results = x.map(func,chunk)


 def performCalc(maxDistance,chunk):

     print(maxDistance)
     return chunk

main()

显然需要做的是(我不知道为什么,也许有人可以在另一个答案中澄清)是您需要将输入顺序切换到函数 performCalc()

如此处所示 -

      def performCalc(maxDistance,chunk):

          print(maxDistance)
          return chunk

推荐阅读