首页 > 解决方案 > 完成工作时的无限池进程,在 python 中,函数应用于一系列 pandas.DataFrame

问题描述

我将 apandas.DataFrame分成 6 个具有相似形状行的部分:118317,118315,...为了平衡工作并尊重在groupby字段上使用的数据的完整性。

这 6 个部分pandas.DataFrame存储在一个list.

以下是每个并行应用的功能。

def compute_recency (df):
    recency = df.groupby('name').apply(lambda x: x['date'] - x['date'].shift()).fillna(0).reset_index()
    df = df.join(recency.set_index('level_1'), rsuffix= '_f')
    return df

然后我将这些过程并行化为:

import multiprocessing as mp

cores=mp.cpu_count()

pool = mp.Pool(cores)

df_out = pool.map(compute_recency, list_of_6_dataframes)

pool.close()
pool.join()

问题是它一直在 jupyter 实验室的 notebook => 中计算[*],而我可以在资源监视器中看到 CPU 现在是“空闲的”,我的意思是它们不像一开始那样处于 100%。

请注意,如果我使用以下功能:

def func(df):
    return df.shape

它工作得很好而且很快,[*]永远不会。

所以我想问题出在功能上compute_recency,但我不明白为什么。
你能帮助我吗 ?


熊猫版本:0.23.4
Python 版本:3.7.4

标签: pythonpandasmultiprocessing

解决方案


在这里看到可能导致问题的原因有点困难。也许因为您正在使用multiprocessing可能将您的数据分成由创建的组groupby?然后使用多处理处理每个组?

from multiprocessing import Pool
groups = [x for _, x in df.groupby("name")]

def add_new_col(x):
    x['new'] = x['date'] - x['date'].shift().fillna(0)
    return x

p = Pool()
groups = p.map(add_new_col, groups)
df = pd.concat(groups, ignore_index=True)
p.close()
p.join()

顺便说一句,关于您的原始代码。p.map将返回数据框列表而不是数据框。这就是为什么我习惯pd.concat在最后结合结果。


推荐阅读