首页 > 解决方案 > Python 多处理 Linux 上的大型数据帧

问题描述

如标题所示,我有一个大数据dfdfmultiprocessing鉴于我的写作技巧和任务的复杂性,我将简要描述我想要实现的目标,并为代码保留细节。

原始数据是df,我想从中执行一些逐行分析(顺序无关紧要),这不仅需要焦点行本身,还需要满足某些条件的其他行。下面是玩具数据和我的代码,

import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math

# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),
                   'district': (['upper'] * 5 + ['down'] * 5) * 3,
                   'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})

df['row_id'] = df.index

print(df)

    value district region  row_id
0       8    upper      A       0
1       4    upper      A       1
2       0    upper      A       2
3       3    upper      A       3
4       0    upper      A       4
5       0     down      A       5
6       3     down      A       6
7       7     down      A       7
8       1     down      A       8
9       7     down      A       9
10      7    upper      B      10
11      3    upper      B      11
12      9    upper      B      12
13      8    upper      B      13
14      2    upper      B      14
15      4     down      B      15
16      5     down      B      16
17      3     down      B      17
18      5     down      B      18
19      3     down      B      19
20      3    upper      C      20
21      1    upper      C      21
22      3    upper      C      22
23      0    upper      C      23
24      3    upper      C      24
25      2     down      C      25
26      0     down      C      26
27      1     down      C      27
28      1     down      C      28
29      0     down      C      29

我想要做的是添加另外两列count_band ,它只是计算在相同和子集中count_a范围内 (value - 2, value) 和 (value, value + 2) 的行数,例如, 对于row应该是 0,因为and中没有行的值是 7,它落在 (8-2, 8) 中。所以期望的输出应该是:regiondistrictcount_brow_id==0region=='A'district == 'upper'

   count_a count_b region row_id
0        0       0      A      0
1        0       1      A      1
2        0       0      A      2
3        1       0      A      3
4        0       0      A      4
5        1       0      A      5
6        0       0      A      6
7        0       0      A      7
8        0       1      A      8
9        0       0      A      9
10       1       0      B     10
11       0       1      B     11
12       0       1      B     12
13       1       1      B     13
14       1       0      B     14
15       2       2      B     15
16       0       1      B     16
17       1       0      B     17
18       0       1      B     18
19       1       0      B     19
20       0       0      C     20
21       0       1      C     21
22       0       0      C     22
23       1       0      C     23
24       0       0      C     24
25       0       2      C     25
26       2       0      C     26
27       1       2      C     27
28       1       2      C     28
29       2       0      C     29

问题1: 这样的任务可以向量化吗?

问题2: 我们如何使用multiprocessing它来加速(已解决)?

我决定使用multiprocessing的原因是我不确定如何通过矢量化来实现这一点。解决方案是(基于提供的答案)

多处理

def b_a(input_df,r_d):
    print('length of input dataframe: ' + str(len(input_df)))
    # print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))
    sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]

    print('length of sliced dataframe: ' + str(len(sub_df)))

    print(r_d[0],r_d[1])


    b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])

    for id in sub_df['row_id']:
        print('processing row: ' + str(id))
        focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
        temp_b = sub_df.loc[
            (sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
        temp_a = sub_df.loc[
            (sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]

        if len(temp_a):
            temp_a['count_a'] = temp_a['row_id'].count()
        else:
            temp_a = temp_a.append(pd.Series(), ignore_index=True)
            temp_a = temp_a.reindex(
                columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)
            print(temp_a)

        if len(temp_b):
            temp_b['count_b'] = temp_b['row_id'].count()
        else:
            temp_b = temp_b.append(pd.Series(), ignore_index=True)
            temp_b = temp_b.reindex(
                columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)
        print(len(temp_a),len(temp_b))

        temp_b.drop_duplicates('count_b', inplace=True)
        temp_a.drop_duplicates('count_a', inplace=True)
        temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
                          temp_a[['count_a']].reset_index(drop=True)], axis=1)

        temp['row_id'] = id
        temp['region'] = str(r_d[0])

        b_a = pd.concat([b_a, temp])

    return b_a

r_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))


if __name__ == '__main__':
    P = Pool(3)
    out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],
                             list(itertools.chain.from_iterable(r_d_list)))) # S3

    # out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2
    # out = P.starmap(b_a,zip(df,r_d_list)) # S1

    # print(out)
    P.close()
    P.join()
    final = pd.concat(out, ignore_index=True)
    print(final)

    final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))

由于使用P.starmap( 以及P.map) 需要向函数提供所有可能的参数对 for b_a因此解决方案S1将不起作用,因为zip(df,r_d_list)实际上会在 的列名df和 中的元素之间产生 zip r_d_list,这将导致错误AttributeError: 'str' object has no attribute 'loc' ,因为input_dffor 函数b_a是字面意思是一个字符串(列名 df),可以通过查看 的输出来验证print('length of input dataframe: ' + str(len(input_df))),这将产生input_df(在这种情况下df)的列名的长度。接受的答案通过创建一个S2与参数列表 ( ) 具有相同长度的参考数组 ( )(不确定它到底是什么)来纠正这个问题r_d_list。此解决方案效果很好,但可能会很慢df很大,因为据我个人的理解,它需要在整个数据帧中搜索每对参数(regiondistrcit),所以我想出了一个修改版本,它根据数据将数据分成块regiondistrcit然后在每个块中搜索,而不是整个数据帧(S3)。对我来说,这个解决方案在运行时间方面将性能提高了 20%,代码如下:

region = df['region'].unique()

chunk_numbers = 3

chunk_region = math.ceil(len(region) / chunk_numbers)

chunks = list()

r_d_list = list()

row_count = 0

for i in range(chunk_numbers):

    print(i)

    if i < chunk_numbers-1:
        regions = region[(i*chunk_region):((i+1)*chunk_region)]
        temp = df.loc[df['region'].isin(regions.tolist())]
        chunks.append(temp)
        r_d_list.append(list(itertools.product(regions,temp['district'].unique())))

        del temp

    else:
        regions = region[(i * chunk_region):len(region)]
        temp = df.loc[df['region'].isin(regions.tolist())]
        chunks.append(temp)
        r_d_list.append(list(itertools.product(regions,temp['district'].unique())))

        del temp

    row_count = row_count + len(chunks[i])
    print(row_count)

print(df)在and之间添加这个def b_a(),记得注释掉r_d_list = ...之前的if __name__ == '__main__'

感谢这个精彩的社区,我现在有了一个可行的解决方案,我更新了我的问题,为将来可能遇到同样问题的人提供一些材料,并更好地制定问题以获得更好的解决方案。

标签: pythonpandaspython-multiprocessing

解决方案


我认为这里有改进的空间。我建议你在里面定义一个函数groupby

import os
import pandas as pd
import numpy as np
import dask.dataframe as dd
N = 30_000
# Now the example is reproducible
np.random.seed(0)
df = pd.DataFrame({'value': np.random.randint(0, 10, size=N),
                   'district': (['upper'] * 5 + ['down'] * 5) * 3000,
                   'region': ['A'] * 10_000 + ['B'] * 10_000 + ['C'] * 10_000,
                   'row_id': np.arange(N)})

以下函数返回给定组中的每一count_acount_b

def fun(vec):
    out = []
    for i, v in enumerate(vec):
        a = vec[:i] + vec[i+1:]
        count_a = np.isin(a, [v-2, 2]).sum()
        count_b = np.isin(a, [v, v+2]).sum()
        out.append([count_a, count_b])
    return out

熊猫

%%time
df[["count_a", "count_b"]] = df.groupby(["district", "region"])["value"]\
                               .apply(lambda x: fun(x))\
                               .explode().apply(pd.Series)\
                               .reset_index(drop=True)
CPU times: user 22.6 s, sys: 174 ms, total: 22.8 s
Wall time: 22.8 s

达斯克

现在您需要再次创建df,然后才能使用dask. 这是我想到的第一件事。肯定有更好/更快的方法。

ddf = dd.from_pandas(df, npartitions=os.cpu_count())

df[["count_a", "count_b"]] = ddf.groupby(["district", "region"])["value"]\
                                .apply(lambda x: fun(x.tolist()),
                                       meta=('x', 'f8'))\
                                .compute(scheduler='processes')\
                                .explode().apply(pd.Series)\
                                .reset_index(drop=True)
CPU times: user 6.92 s, sys: 114 ms, total: 7.04 s
Wall time: 13.4 s

多处理

在这种情况下,您需要再次创建df. 这里的诀窍是拆分为df一个s 列表。lstdf

import multiprocessing as mp
def parallelize(fun, vec, cores):
    with mp.Pool(cores) as p:
        res = p.map(fun, vec)
    return res

def par_fun(d):
    d = d.reset_index(drop=True)
    o = pd.DataFrame(fun(d["value"].tolist()),
                     columns=["count_a", "count_b"])
    return pd.concat([d,o], axis=1)
%%time
lst = [l[1] for l in list(df.groupby(["district", "region"]))]

out = parallelize(par_fun, lst, os.cpu_count())
out = pd.concat(out, ignore_index=True)
CPU times: user 152 ms, sys: 49.7 ms, total: 202 ms
Wall time: 5 s

最终,您可以fun使用numba.


推荐阅读