首页 > 解决方案 > Proper multiprocessing syntax to apply a function to each row of pandas dataframe instead of using chunks

问题描述

I have a dataframe with two columns where each of the two columns contains a list indices. I want to get the product of the two lists on a row by row level to create a multiindex.

For example, df1 below


|---------------------|------------------|
|        col_a        |        col_b     |
|---------------------|------------------|
|       [A1, A2]      |        [B1]      |
|---------------------|------------------|
|          [A3]       |      [B2, B3]    |
|---------------------|------------------|

would turn into this:

MultiIndex([('A1', 'B1'),
            ('A2', 'B1'),
            ('A3', 'B2'),
            ('A3', 'B3')],
names = ['col_a', 'col_b'], length = 4)

The size of df1 is about 50K rows, the average length of the list in col_a is 300, and the average length of the list in col_b is 30. Since this is a fairly hefty task, I have decided to go down the multiprocessing route and do the following:

def worker(x):
  return(x.apply(lambda row: [tup for tup in itertools.product(*row)], axis = 1).sum()

if __name__ == '__main__':
  num_processes = 56
  data_split = np.array_split(df1, num_processes)
  p = Pool(processes = num_processes)
  output = p.map(worker, data_split)
  output_tup = tuple(itertools.chain(*output))
  mult_ind = pd.MultiIndex.from_tuples(output_tup, names = df1)

While this works much better than a regular apply, I want to move away from chunking and instead assign a single row to each of the 56 processes, have the worker function run on the row in each of the processes, pickle the output, and then assign a new row to whichever process is open and repeat this until all rows are complete.

I'm thinking that I need to use map_async or imap for this, but for the life of me have not been able to get the syntax to work for this. Is it possible to do what I am asking?

标签: pythonpandasoptimizationmultiprocessingapply

解决方案


推荐阅读