首页 > 解决方案 > joblib 与大型 pandas.DataFrame 并行慢

问题描述

我正在尝试并行化一个基本上循环数据帧数组的 for 循环。显然我遗漏了一些东西,因为这适用于相对较小的 DataFrame(~1e5 行),但对于较大的 DataFrame(~3e6 行)来说会变得非常慢。

最小(不是真的,但我试过)的例子:

import numpy as np
import pandas as pd
import multiprocessing
from joblib import Parallel, delayed

class Analysis:

    # ...
    # istantiate the pd.DataFrame self.data 

    # Checks for every person (pid) if the difference between consecutive values of lifetime is equal to 
    # some value (self.samp). If not, increase the 'trip #' of the corresponding row entries 
    def trip_manager(self):
      
        self.data.insert(0, 'trip #', 0)
        for pid in self.data.index.unique():
            datadf = self.data.loc[pid]

            newtrip = 0
            while(1):

                try:
                    arr = np.where(datadf['lifetime'].values[newtrip+1:] -
                                   datadf['lifetime'].values[newtrip:-1]
                                   != self.samp)[0]

                except AttributeError:
                    break

                if(len(arr) == 0):  # if only one value, datadf is a pd.Series
                    break

                newtrip += arr[0] + 1
                datadf.iloc[newtrip:, 0] += 1


class MultiAnalysis:

    # ...
    # istantiate the array of Analysis self.sim

    # For each element of the array execute trip_manager()
    def aggregate(self):

        num_cores = multiprocessing.cpu_count()
        njobs = num_cores
        inputs = range(len(self.msim))

        def wrapper(sim):
            sim.trip_manager()
            return sim

        if __name__ == '__main__':
            self.sim = Parallel(n_jobs=njobs, verbose=100, backend='loky')(
                                delayed(wrapper)(self.sim[i]) for i in inputs)

当进程被分叉并且元素作为参数传递给包装器时,肯定有更好的方法来处理内存以避免复制 self.sim 的元素,但我认为问题不在于,因为速度变慢了是trip_manager() 中的for 循环。

任何帮助或评论表示赞赏,谢谢。

标签: pythonpandasjoblib

解决方案


推荐阅读