首页 > 解决方案 > 并行化 groupby:将函数同时应用于 groupby 对象

问题描述

我想按操作执行分组,并为每个组估计一个线性模型。

编写一个函数然后使用for 循环非常容易,但是有点慢。

这是一个玩具示例,但它确实起到了作用。在您看来,使这种并行化的“最佳”方式是什么?

一个直观的例子:

import seaborn as sns
import pandas as pd
from statsmodels.formula.api import ols
import time


# Dataset
df = sns.load_dataset("tips")
df.head()


# Groupby the dataset
df_grouped = df.groupby(["day"])


# Some function to be applied for every grouped element
def regression_model(df):
    """
    This function estimates a linear regression model and returns coefs as dictionary
    """
    model = ols('tip ~ total_bill + C(sex) + size', data = df)
    return dict(model.fit().params)

# Performing the function in the for loop ------ Slow. We want to perform it for each grouped element simultaneously.
coefs_dict = {}

for i, j in df_grouped:
    coefs_i = regression_model(j)
    coefs_dict[i] = coefs_i
    
    # Artificial sleep so we can demostrate that the "mechanical" for loop is slow.... 
    
    time.sleep(2)

在这种特殊情况下,我使用“睡眠”模块来使其更慢,以证明 for 循环将花费大量时间,特别是如果我们将按更多数量的独特类别进行分组。

标签: pythonpandasperformancefor-loopparallel-processing

解决方案


您可以multiprocessing按照@JérômeRichard 的建议使用模块,并Pool.starmap使用groupby

import pandas as pd
import multiprocessing


def regression_model(keys, df):
    print(f'Pool: {keys}')
    # do stuff here
    return df


if __name__ == '__main__':
    data = []
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        data = pool.starmap(regression_model, df.groupby('day'))
        df2 = pd.concat(data)

推荐阅读