python - 使用 concurrent.futures 进行并行处理
问题描述
我尝试找到一种方法来使用不同的方法对数据帧进行并行处理,如本教程所示:https ://www.youtube.com/watch?v= fKl2JW_qrso(分钟 >18:26)。但结果显示我出了点问题。该代码的想法是在数据框中创建一个新列 ['分母'],其中包含“basalareap”、“basalareas”、“basalaread”列中每个字段的行和。任何建议这里有什么问题,我在打印时得到了这个奇怪的结果?此外,还有其他方法可以最有效地进行并行化吗?
import pandas as pd
import numpy as np
import concurrent.futures
from multiprocessing import cpu_count
np.random.seed(4)
layer = pd.DataFrame(np.random.randint(0,25,size=(10, 3)),
columns=list(['basalareap', 'basalareas', 'basalaread']))
def denom():
layer['denominator'] = layer[["basalareap","basalareas","basalaread"]].sum(axis=1)
data_split = np.array_split(layer,cpu_count())
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(denom) for i in data_split]
print(results)
>>>print(results)
[<Future at 0x1b45e325108 state=finished raised BrokenProcessPool>,
<Future at 0x1b45e357708 state=finished raised BrokenProcessPool>,
<Future at 0x1b45e3577c8 state=finished raised BrokenProcessPool>,
<Future at 0x1b45e357888 state=finished raised BrokenProcessPool>,
<Future at 0x1b45e357948 state=finished raised BrokenProcessPool>,
<Future at 0x1b45e357a48 state=finished raised BrokenProcessPool>,
<Future at 0x1b45e357b08 state=finished raised BrokenProcessPool>,
<Future at 0x1b45e357bc8 state=finished raised BrokenProcessPool>]
我的系统:Windows 10 python 3.7.4
解决方案
这是一种可以使其工作的方法(使用您的示例数据):
import pandas as pd
import numpy as np
import concurrent.futures as cf
from multiprocessing import cpu_count
np.random.seed(4)
layer = pd.DataFrame(np.random.randint(0,25,size=(10, 3)),
columns=list(['basalareap', 'basalareas', 'basalaread']))
def denom(layer):
layer['denominator'] = layer[["basalareap","basalareas","basalaread"]].sum(axis=1)
return layer
if __name__ == '__main__':
data_split = np.array_split(layer,cpu_count())
# create a function to for process tasks
def cpu_tasks(func, *args):
with cf.ProcessPoolExecutor() as tp:
result = tp.map(func, chunksize=10, *args)
return list(result)
# get result
newdf = cpu_tasks(denom, data_split)
# convert list to dataframe
newdf = pd.concat(newdf)
print(newdf)
basalareap basalareas basalaread denominator
0 14 23 5 42
1 1 8 23 32
2 8 18 9 35
3 7 13 23 43
4 23 8 4 35
5 18 12 6 36
6 10 20 3 33
7 0 23 21 44
8 21 9 6 36
9 6 24 2 32
推荐阅读
- arrays - 这个数组初始化是什么意思?CPLEX
- php - 每次启动 PHP 前更改 cainfo 和 cafile
- google-cloud-run - Cloud Run (GCR) 可以覆盖容器的 CMD 吗?
- javascript - enable() 和 disable() emojionearea
- python - 有没有一种有效的方法可以用另一个字符串覆盖索引处的字符串?
- typescript - Heroku 找不到 ts 节点
- java - for循环中的.Array索引越界异常
- java - 如何在 Java 中动态指定对象的 Class?
- android - 是否可以通过创建 AOSP API 代码的副本来使用隐藏方法?
- jestjs - 开玩笑警告:无法对未安装的组件执行 React 状态更新。在 componentDidMount 上