python - 完成工作时的无限池进程,在 python 中,函数应用于一系列 pandas.DataFrame
问题描述
我将 apandas.DataFrame
分成 6 个具有相似形状行的部分:118317,118315,...
为了平衡工作并尊重在groupby
字段上使用的数据的完整性。
这 6 个部分pandas.DataFrame
存储在一个list
.
以下是每个并行应用的功能。
def compute_recency (df):
recency = df.groupby('name').apply(lambda x: x['date'] - x['date'].shift()).fillna(0).reset_index()
df = df.join(recency.set_index('level_1'), rsuffix= '_f')
return df
然后我将这些过程并行化为:
import multiprocessing as mp
cores=mp.cpu_count()
pool = mp.Pool(cores)
df_out = pool.map(compute_recency, list_of_6_dataframes)
pool.close()
pool.join()
问题是它一直在 jupyter 实验室的 notebook => 中计算[*]
,而我可以在资源监视器中看到 CPU 现在是“空闲的”,我的意思是它们不像一开始那样处于 100%。
请注意,如果我使用以下功能:
def func(df):
return df.shape
它工作得很好而且很快,[*]
永远不会。
所以我想问题出在功能上compute_recency
,但我不明白为什么。
你能帮助我吗 ?
熊猫版本:0.23.4
Python 版本:3.7.4
解决方案
在这里看到可能导致问题的原因有点困难。也许因为您正在使用multiprocessing
可能将您的数据分成由创建的组groupby
?然后使用多处理处理每个组?
from multiprocessing import Pool
groups = [x for _, x in df.groupby("name")]
def add_new_col(x):
x['new'] = x['date'] - x['date'].shift().fillna(0)
return x
p = Pool()
groups = p.map(add_new_col, groups)
df = pd.concat(groups, ignore_index=True)
p.close()
p.join()
顺便说一句,关于您的原始代码。p.map
将返回数据框列表而不是数据框。这就是为什么我习惯pd.concat
在最后结合结果。
推荐阅读
- go - 如何撤消“go mod init”
- amazon-web-services - 如何确认我的 AWS Lambda 函数更新成功?
- php - Python 到前端的通信
- oracle - 如何调用正在运行的 SQL 脚本的文件名?
- spring-boot - 主复合键 ID 上的 org.hibernate.NonUniqueObjectException
- string - Z3py 模型返回 EMPTY
- javascript - 检测 JavaScript 中动态创建的输入的 id
- demandware - 在内容资产中创建会话重定向链接
- flutter - 键盘出现时颤振状态重建
- javascript - Nuxt.js 根据嵌套更改数据属性中 json 数据的格式