python - joblib 与大型 pandas.DataFrame 并行慢
问题描述
我正在尝试并行化一个基本上循环数据帧数组的 for 循环。显然我遗漏了一些东西,因为这适用于相对较小的 DataFrame(~1e5 行),但对于较大的 DataFrame(~3e6 行)来说会变得非常慢。
最小(不是真的,但我试过)的例子:
import numpy as np
import pandas as pd
import multiprocessing
from joblib import Parallel, delayed
class Analysis:
# ...
# istantiate the pd.DataFrame self.data
# Checks for every person (pid) if the difference between consecutive values of lifetime is equal to
# some value (self.samp). If not, increase the 'trip #' of the corresponding row entries
def trip_manager(self):
self.data.insert(0, 'trip #', 0)
for pid in self.data.index.unique():
datadf = self.data.loc[pid]
newtrip = 0
while(1):
try:
arr = np.where(datadf['lifetime'].values[newtrip+1:] -
datadf['lifetime'].values[newtrip:-1]
!= self.samp)[0]
except AttributeError:
break
if(len(arr) == 0): # if only one value, datadf is a pd.Series
break
newtrip += arr[0] + 1
datadf.iloc[newtrip:, 0] += 1
class MultiAnalysis:
# ...
# istantiate the array of Analysis self.sim
# For each element of the array execute trip_manager()
def aggregate(self):
num_cores = multiprocessing.cpu_count()
njobs = num_cores
inputs = range(len(self.msim))
def wrapper(sim):
sim.trip_manager()
return sim
if __name__ == '__main__':
self.sim = Parallel(n_jobs=njobs, verbose=100, backend='loky')(
delayed(wrapper)(self.sim[i]) for i in inputs)
当进程被分叉并且元素作为参数传递给包装器时,肯定有更好的方法来处理内存以避免复制 self.sim 的元素,但我认为问题不在于,因为速度变慢了是trip_manager() 中的for 循环。
任何帮助或评论表示赞赏,谢谢。