python - 无法向 concurrrent.futures.Executor.map() 发送多个参数
问题描述
我正在尝试结合这两个 SO 答案中提供的解决方案 -使用线程将数组切片成块并对每个块执行计算并将返回的数组重新组合成一个数组并将多个参数传递给 concurrent.futures.Executor.map?. 我有一个 numpy 数组,我将其分块成段,我希望将每个块发送到一个单独的线程,并与原始数组的块一起发送一个附加参数。这个额外的参数是一个常数,不会改变。performCalc 是一个函数,它将接受两个参数 - 一个是原始 numpy 数组的块,另一个是常量。
我尝试的第一个解决方案
import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def main():
testThread()
def testThread():
minLat = -65.76892
maxLat = 66.23587
minLon = -178.81404
maxLon = 176.2949
latGrid = np.arange(minLat,maxLat,0.05)
lonGrid = np.arange(minLon,maxLon,0.05)
gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
n_jobs = psutil.cpu_count(logical=False)
chunk = np.array_split(grid_points,n_jobs,axis=0)
x = ThreadPoolExecutor(max_workers=n_jobs)
maxDistance = 4.3
func = partial(performCalc,chunk)
args = [chunk,maxDistance]
# This prints 4.3 twice although there are four cores in the system
results = x.map(func,args)
# This prints 4.3 four times correctly
results1 = x.map(performTest,chunk)
def performCalc(chunk,maxDistance):
print(maxDistance)
return chunk
def performTest(chunk):
print("test")
main()
因此,即使系统中的核心数为 4,performCalc() 也会打印 4.3 两次。而 performTest() 会正确打印四次 test。我无法弄清楚这个错误的原因。
此外,我确定我设置 for itertools.partial 调用的方式不正确。
1)原始numpy数组有四个块。
2) 每个块要与 maxDistance 配对并发送到 performCalc()
3) 将有四个线程打印 maxDistance 并返回总结果的一部分,这些结果将在一个数组中返回
我哪里错了?
更新
我也尝试使用 lambda 方法
results = x.map(lambda p:performCalc(*p),args)
但这什么也没打印。
解决方案
使用用户 mkorvas 提供的解决方案,如下所示 -如何将具有多个参数的函数传递给 python concurrent.futures.ProcessPoolExecutor.map()?我能够解决我的问题,如这里的解决方案所示 -
import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def main():
testThread()
def testThread():
minLat = -65.76892
maxLat = 66.23587
minLon = -178.81404
maxLon = 176.2949
latGrid = np.arange(minLat,maxLat,0.05)
lonGrid = np.arange(minLon,maxLon,0.05)
print(latGrid.shape,lonGrid.shape)
gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
print(grid_points.shape)
n_jobs = psutil.cpu_count(logical=False)
chunk = np.array_split(grid_points,n_jobs,axis=0)
x = ThreadPoolExecutor(max_workers=n_jobs)
maxDistance = 4.3
func = partial(performCalc,maxDistance)
results = x.map(func,chunk)
def performCalc(maxDistance,chunk):
print(maxDistance)
return chunk
main()
显然需要做的是(我不知道为什么,也许有人可以在另一个答案中澄清)是您需要将输入顺序切换到函数 performCalc()
如此处所示 -
def performCalc(maxDistance,chunk):
print(maxDistance)
return chunk
推荐阅读
- php - 使用 INT 和字符串的 PHP 循环 - 这是如何工作的?
- nestjs - Nestjs用参数创建一个模块的几个实例
- reactjs - D3 旭日形图中的标签环绕
- objective-c - 如何在 Mac OS 上从框架文件中查看接口规范
- amazon-s3 - 我在哪里编写 kafka 连接接收器自定义分区器的代码?
- flutter - 从flutter-web向fastapi服务器发出http get请求时出错
- c# - 使用命名管道/流转换和输出到流不包含流持续时间
- javascript - 使用数据属性重新排序子元素并应用显示/隐藏
- android - Android 应用程序通过点击 nfc 标签自动启动特定活动
- plc - 在 Twincat 3 中获取枚举的字符串值