python - Python multiprocess.Pool.map 无法处理大型数组。
问题描述
这是我用来在 pandas.DataFrame 对象的行上对应用函数进行 parrellize 的代码:
from multiprocessing import cpu_count, Pool
from functools import partial
def parallel_applymap_df(df: DataFrame, func, num_cores=cpu_count(),**kargs):
partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))
pool.close()
pool.join()
return series
它适用于 200 000 行的子样本,但是当我尝试完整的 200 000 000 个示例时,我收到以下错误消息:
~/anaconda3/lib/python3.6/site-packages/multiprocess/connection.py in _send_bytes(self, buf)
394 n = len(buf)
395 # For wire compatibility with 3.2 and lower
—> 396 header = struct.pack("!i", n)
397 if n > 16384:
398 # The payload is large so Nagle's algorithm won't be triggered
error: 'i' format requires -2147483648 <= number <= 2147483647
由行生成:
series = pd.concat(pool.map(partial(apply_wrapper, func=func, **kargs), df_split))
这很奇怪,因为我用来并行化未在 pandas 中矢量化的操作的稍微不同的版本(如 Series.dt.time)适用于相同数量的行。这是示例作品的版本:
def parallel_map_df(df: DataFrame, func, num_cores=cpu_count()):
partitions = np.linspace(0, len(df), num_cores + 1, dtype=np.int64)
df_split = [df.iloc[partitions[i]:partitions[i + 1]] for i in range(num_cores)]
pool = Pool(num_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
解决方案
错误本身来自多处理在池中的不同工作人员之间建立连接的事实。要向该工作人员发送数据或从该工作人员发送数据,数据必须以字节为单位发送。第一步是为将发送给工作人员的消息创建一个标头。此标头包含作为整数的缓冲区长度。但是,如果缓冲区的长度大于可以用整数表示的长度,则代码将失败并产生您显示的错误。
我们缺少重现您的问题所需的数据和大量代码,因此我将提供一个最小的工作示例:
import numpy
import pandas
import random
from typing import List
from multiprocessing import cpu_count, Pool
def parallel_applymap_df(
input_dataframe: pandas.DataFrame, func, num_cores: int = cpu_count(), **kwargs
) -> pandas.DataFrame:
# Create splits in the dataframe of equal size (one split will be processed by one core)
partitions = numpy.linspace(
0, len(input_dataframe), num_cores + 1, dtype=numpy.int64
)
splits = [
input_dataframe.iloc[partitions[i] : partitions[i + 1]]
for i in range(num_cores)
]
# Just for debugging, add metadata to each split
for index, split in enumerate(splits):
split.attrs["split_index"] = index
# Create a pool of workers
with Pool(num_cores) as pool:
# Map the splits in the dataframe to workers in the pool
result: List[pandas.DataFrame] = pool.map(func, splits, **kwargs)
# Combine all results of the workers into a new dataframe
return pandas.concat(result)
if __name__ == "__main__":
# Create some test data
df = pandas.DataFrame([{"A": random.randint(0, 100)} for _ in range(200000000)])
def worker(df: pandas.DataFrame) -> pandas.DataFrame:
# Print the length of the dataframe being processed (for debugging)
print("Working on split #", df.attrs["split_index"], "Length:", len(df))
# Do some arbitrary stuff to the split of the dataframe
df["B"] = df.apply(lambda row: f"test_{row['A']}", axis=1)
# Return the result
return df
# Create a new dataframe by applying the worker function to the dataframe in parallel
df = parallel_applymap_df(df, worker)
print(df)
请注意,这可能不是最快的方法。如需更快的替代方案,请查看swifter
或dask
。
推荐阅读
- python - 如何在此代码中使用更少的代码行?(出自著名的米棋寓言)
- android - 通过kotlin创建应用时出现非法参数异常
- angular - Angular 11 中的错误 - 试图从后面获取数据并在屏幕上实现它 - 无法读取未定义的属性“值”
- java - Android 10 打开失败:ENOENT(没有这样的文件或目录)
- java - Google Photo Api Upload Error : "code": 3, "Failed: There was an error while trying to create this media item
- r - 将列表放入R中的单个数据框中
- erlang - 如何在混合项目设置中启动 Erlang shell?
- arduino - 将 Arduino Nano PWM 定时器 1 设置为 10 位和 16 kHz 的代码
- c# - 为什么我的代码说我的对象没有定义?
- vba - 如何从 OnContentChanged 事件中获取 LibreOffice calc 中的 XRange?