python - 在 python 3.4.7 的 pool.map 函数中添加额外的随机参数作为参数
问题描述
我想在大型数据集上使用多处理来查找两列的乘积,并使用参数中的给定参数过滤数据集。我构建了一个测试集,但我无法让多处理在这个集上工作。
首先,我试图在parallelize_dataframe 函数中划分数据集,然后在subset_col 函数中应用乘法函数和过滤函数。稍后我将完整的数据集附加回parallelize_dataframe。
import numpy as np
import pandas as pd
from multiprocessing import Pool
from multiprocessing import Lock
df = pd.DataFrame({'col1': [1, 0, 1, 1, 1, 0, 0, 1, 0, 1],
'col2': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
'col3': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'col4': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})
def subset_col(df, p):
print("Working with number: " + str(p))
df[col5] = df[col3]*df[col4]
df= df[df['col1'] == p]
def parallelize_dataframe(df, p, func, n_cores=80):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split, p))
pool.close()
pool.join()
return df
df3 = parallelize_dataframe(df,1,subset_col)
结果应该是 col3 和 col4 的乘积,其中 col1 用一个值过滤。但我总是得到一个错误说:
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in parallelize_dataframe
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
但是,如果我从所有函数中删除过滤器“p”,它就可以正常工作。有人可以帮我调试吗?
解决方案
从multiprocessing.Pool.map的官方文档中,它“只支持一个可迭代参数”。因此,您需要更改接口subset_col
以采用单个参数。此外,您忘记创建列字符串,导致名称错误。为了减少计算量,您应该在乘法之前进行过滤。然后应该返回一个值,除非您的函数仅通过副作用操作(我假设您不想要这个,因为您连接了池结果)。
def subset_col(pair):
df, p = pair
print("Working with number: " + str(p))
df = df[df['col1'] == p].copy()
df['col5'] = df['col3']
return df
接下来,我们将需要修复您的调用方式pool.map
,因为根据您正在执行的操作,它应该只需要 2 个参数(第三个,最后一个参数是 chunksize)。由于您希望p
每个进程都使用相同的值,因此我们将 zipdfs
与每个的重复值一起压缩p
。此外,考虑使用上下文管理器来处理关闭资源。
def parallelize_dataframe(df, p, func, n_cores=None):
if n_cores is None:
n_cores = os.cpu_count()
dfs = np.array_split(df, n_cores)
pairs = zip(dfs, itertools.repeat(p))
with Pool(n_cores) as pool:
result = pool.map(func, pairs)
df = pd.concat(result)
return df
这现在正确地返回了新的数据框。但我绝对怀疑你有一台 80 核的机器。考虑实现n_cores=None
让Python通过使用os.cpu_count
df3 = parallelize_dataframe(df, 1, subset_col)
根据您对Pool.starmap
变体的要求:
def subset_col(df, p):
# remove unpacking line
...
def parallelize_dataframe(df, p, func, n_cores=None):
...
# change `pool.map(...)` to `pool.starmap(...)`
...
但是,您应该注意,Pool
不提供两者都是惰性评估版本imap
的imap_unordered
替代方案starmap
,无论是否保留顺序都不同。
推荐阅读
- c# - 将文件上传到 Azure Blob 存储会导致大小为 0
- f# - F# - 如何一起定义多个泛型函数
- reactjs - 尝试更新状态时遇到错误
- numpy - 使用 np.lib.stride_tricks.as_strided 的含义
- unity3d - 如何添加额外的相机,以便在我使用电影相机时使用脚本 onBecomeInvisible() 的对象不会被破坏?
- angular - 添加自定义属性以增强联合登录
- javascript - 模拟 axios 接收空对象
- amazon-web-services - AWS CDK -- 错误:在 Azure DevOps 管道中找不到模块“@aws-cdk/cloud-assembly-schema”
- asp.net - 您如何使用没有明显扩展名的网站 URL
- flutter - 在 Flutter 中缩小文本基线以下的区域