首页 > 解决方案 > 使用 concurrent.futures 进行并行处理

问题描述

我尝试找到一种方法来使用不同的方法对数据帧进行并行处理,如本教程所示:https ://www.youtube.com/watch?v= fKl2JW_qrso(分钟 >18:26)。但结果显示我出了点问题。该代码的想法是在数据框中创建一个新列 ['分母'],其中包含“basalareap”、“basalareas”、“basalaread”列中每个字段的行和。任何建议这里有什么问题,我在打印时得到了这个奇怪的结果?此外,还有其他方法可以最有效地进行并行化吗?

import pandas as pd
import numpy as np
import concurrent.futures
from multiprocessing import cpu_count

np.random.seed(4)
layer = pd.DataFrame(np.random.randint(0,25,size=(10, 3)),
                  columns=list(['basalareap', 'basalareas', 'basalaread']))

def denom():
    layer['denominator'] = layer[["basalareap","basalareas","basalaread"]].sum(axis=1)

data_split = np.array_split(layer,cpu_count())


with concurrent.futures.ProcessPoolExecutor() as executor:
    results = [executor.submit(denom) for i in data_split]
print(results)

>>>print(results)
[<Future at 0x1b45e325108 state=finished raised BrokenProcessPool>, 
<Future at 0x1b45e357708 state=finished raised BrokenProcessPool>, 
<Future at 0x1b45e3577c8 state=finished raised BrokenProcessPool>, 
<Future at 0x1b45e357888 state=finished raised BrokenProcessPool>, 
<Future at 0x1b45e357948 state=finished raised BrokenProcessPool>, 
<Future at 0x1b45e357a48 state=finished raised BrokenProcessPool>, 
<Future at 0x1b45e357b08 state=finished raised BrokenProcessPool>, 
<Future at 0x1b45e357bc8 state=finished raised BrokenProcessPool>]

我的系统:Windows 10 python 3.7.4

标签: pythonpandasparallel-processing

解决方案


这是一种可以使其工作的方法(使用您的示例数据):

import pandas as pd
import numpy as np
import concurrent.futures as cf
from multiprocessing import cpu_count

np.random.seed(4)
layer = pd.DataFrame(np.random.randint(0,25,size=(10, 3)),
                  columns=list(['basalareap', 'basalareas', 'basalaread']))

def denom(layer):
    layer['denominator'] = layer[["basalareap","basalareas","basalaread"]].sum(axis=1)
    return layer

if __name__ == '__main__':

    data_split = np.array_split(layer,cpu_count())

    # create a function to for process tasks
    def cpu_tasks(func, *args):

        with cf.ProcessPoolExecutor() as tp:
            result = tp.map(func, chunksize=10, *args)
        return list(result)

    # get result
    newdf = cpu_tasks(denom, data_split)

    # convert list to dataframe
    newdf = pd.concat(newdf)
    print(newdf)


       basalareap  basalareas  basalaread  denominator
    0          14          23           5           42
    1           1           8          23           32
    2           8          18           9           35
    3           7          13          23           43
    4          23           8           4           35
    5          18          12           6           36
    6          10          20           3           33
    7           0          23          21           44
    8          21           9           6           36
    9           6          24           2           32



推荐阅读