首页 > 解决方案 > Python:按唯一 ID 分组的分块数据的并行化

问题描述

我有巨大的 ( df) 排序Date_startID如下所示:

Date_start    ID      Start_flag    End_flag   Date_end                             
01-01-2019    100     1             0         01-02-2019   
01-02-2019    100     0             0         01-03-2019
01-03-2019    100     0             0         01-04-2019
01-06-2019    100     0             0         01-07-2019
01-09-2019    500     1             0         01-10-2019     
01-11-2019    500     0             0         01-12-2019
01-05-2020    500     0             0         01-06-2020
01-06-2020    500     0             0         01-07-2020
01-07-2020    500     0             0         01-08-2020
01-08-2020    500     0             0         01-09-2020 
01-09-2020    700     1             0         01-12-2020
01-01-2021    700     0             0         01-04-2021
01-04-2021    700     0             1         01-07-2021

我还有一个函数,它有助于在 date_ranges 的每一行中定义重叠。我需要将此功能应用于df分组,ID如下所示:

df.groupby('ID').apply(detect_overlapping)

最后,我按唯一 ID 拆分df为块,最后df_chunked- 是每个块中具有 25000 个 ID 的数据帧列表。为了在我使用的每个块中并行计算multiprocessing.Pool

def applyParallel(grouped_df, func):
    num_cores = 8
    with Pool(num_cores) as pool:
        result_list = pool.map(func, [group for name, group in grouped_df])
        pool.close()
        pool.join()
    return pd.concat(result_list, axis=0)

我在一个循环中应用它df_chunked

result = []
for i in range(0, len(df_chunked)):
   print('chunk# ', i)
   gr_data = df_chunked[i].groupby('ID')
   df_with_target = applyParallel(gr_data, detect_overlapping)
   result.append(df_with_target)

上述方法的性能不佳。我也尝试过测试pandarallel,但它崩溃了df

为了减少它,我试图找到一个如何并行df_chunked化每组[df_chunked[0],df_chunked[1]],[df_chunked[2],df_chunked[3]]...,run的所有和内部的解决方案applyParallel(gr_data, detect_overlapping)

问题:有没有什么办法可以用我描述的更好的方法来解决这个任务?谢谢。

标签: group-byapplypython-multiprocessingpoolchunking

解决方案


推荐阅读