python - Python 多处理 Linux 上的大型数据帧
问题描述
如标题所示,我有一个大数据df
框df
(multiprocessing
鉴于我的写作技巧和任务的复杂性,我将简要描述我想要实现的目标,并为代码保留细节。
原始数据是df
,我想从中执行一些逐行分析(顺序无关紧要),这不仅需要焦点行本身,还需要满足某些条件的其他行。下面是玩具数据和我的代码,
import pandas as pd
import numpy as np
import itertools
from multiprocessing import Pool
import time
import math
# a test example
start_time = time.time()
df = pd.DataFrame({'value': np.random.randint(0, 10, size=30),
'district': (['upper'] * 5 + ['down'] * 5) * 3,
'region': ['A'] * 10 + ['B'] * 10 + ['C'] * 10})
df['row_id'] = df.index
print(df)
value district region row_id
0 8 upper A 0
1 4 upper A 1
2 0 upper A 2
3 3 upper A 3
4 0 upper A 4
5 0 down A 5
6 3 down A 6
7 7 down A 7
8 1 down A 8
9 7 down A 9
10 7 upper B 10
11 3 upper B 11
12 9 upper B 12
13 8 upper B 13
14 2 upper B 14
15 4 down B 15
16 5 down B 16
17 3 down B 17
18 5 down B 18
19 3 down B 19
20 3 upper C 20
21 1 upper C 21
22 3 upper C 22
23 0 upper C 23
24 3 upper C 24
25 2 down C 25
26 0 down C 26
27 1 down C 27
28 1 down C 28
29 0 down C 29
我想要做的是添加另外两列count_b
and ,它只是计算在相同和子集中count_a
范围内 (value - 2, value) 和 (value, value + 2) 的行数,例如,
对于row应该是 0,因为and中没有行的值是 7,它落在 (8-2, 8) 中。所以期望的输出应该是:region
district
count_b
row_id==0
region=='A'
district == 'upper'
count_a count_b region row_id
0 0 0 A 0
1 0 1 A 1
2 0 0 A 2
3 1 0 A 3
4 0 0 A 4
5 1 0 A 5
6 0 0 A 6
7 0 0 A 7
8 0 1 A 8
9 0 0 A 9
10 1 0 B 10
11 0 1 B 11
12 0 1 B 12
13 1 1 B 13
14 1 0 B 14
15 2 2 B 15
16 0 1 B 16
17 1 0 B 17
18 0 1 B 18
19 1 0 B 19
20 0 0 C 20
21 0 1 C 21
22 0 0 C 22
23 1 0 C 23
24 0 0 C 24
25 0 2 C 25
26 2 0 C 26
27 1 2 C 27
28 1 2 C 28
29 2 0 C 29
问题1: 这样的任务可以向量化吗?
问题2: 我们如何使用multiprocessing
它来加速(已解决)?
我决定使用multiprocessing
的原因是我不确定如何通过矢量化来实现这一点。解决方案是(基于提供的答案)
多处理
def b_a(input_df,r_d):
print('length of input dataframe: ' + str(len(input_df)))
# print('region: ' + str(r_d[0]), 'district: ' + str(r_d[1]))
sub_df = input_df.loc[(input_df['region'].isin([r_d[0]])) & (input_df['district'].isin([r_d[1]]))]
print('length of sliced dataframe: ' + str(len(sub_df)))
print(r_d[0],r_d[1])
b_a = pd.DataFrame(columns=['count_a', 'count_b', 'row_id', 'region'])
for id in sub_df['row_id']:
print('processing row: ' + str(id))
focal_value = sub_df.loc[sub_df['row_id'].isin([id])]['value']
temp_b = sub_df.loc[
(sub_df['value'] > (focal_value - 2).values[0]) & (sub_df['value'] < (focal_value.values[0]))]
temp_a = sub_df.loc[
(sub_df['value'] > (focal_value.values[0])) & (sub_df['value'] < (focal_value + 2).values[0])]
if len(temp_a):
temp_a['count_a'] = temp_a['row_id'].count()
else:
temp_a = temp_a.append(pd.Series(), ignore_index=True)
temp_a = temp_a.reindex(
columns=[*temp_a.columns.tolist(), 'count_a'], fill_value=0)
print(temp_a)
if len(temp_b):
temp_b['count_b'] = temp_b['row_id'].count()
else:
temp_b = temp_b.append(pd.Series(), ignore_index=True)
temp_b = temp_b.reindex(
columns=[*temp_b.columns.tolist(), 'count_b'], fill_value=0)
print(len(temp_a),len(temp_b))
temp_b.drop_duplicates('count_b', inplace=True)
temp_a.drop_duplicates('count_a', inplace=True)
temp = pd.concat([temp_b[['count_b']].reset_index(drop=True),
temp_a[['count_a']].reset_index(drop=True)], axis=1)
temp['row_id'] = id
temp['region'] = str(r_d[0])
b_a = pd.concat([b_a, temp])
return b_a
r_d_list = list(itertools.product(df['region'].unique(),df['district'].unique()))
if __name__ == '__main__':
P = Pool(3)
out = P.starmap(b_a, zip([chunks[r_d_list.index(j)] for j in r_d_list for i in range(len(j))],
list(itertools.chain.from_iterable(r_d_list)))) # S3
# out = P.starmap(b_a, zip([df for i in range(len(r_d_list))], r_d_list)) # S2
# out = P.starmap(b_a,zip(df,r_d_list)) # S1
# print(out)
P.close()
P.join()
final = pd.concat(out, ignore_index=True)
print(final)
final.to_csv('final.csv',index=False)
print("--- %s seconds ---" % (time.time() - start_time))
由于使用P.starmap
( 以及P.map
) 需要向函数提供所有可能的参数对 for ,b_a
因此解决方案S1
将不起作用,因为zip(df,r_d_list)
实际上会在 的列名df
和 中的元素之间产生 zip r_d_list
,这将导致错误AttributeError: 'str' object has no attribute 'loc'
,因为input_df
for 函数b_a
是字面意思是一个字符串(列名 df),可以通过查看 的输出来验证print('length of input dataframe: ' + str(len(input_df)))
,这将产生input_df
(在这种情况下df
)的列名的长度。接受的答案通过创建一个S2
与参数列表 ( ) 具有相同长度的参考数组 ( )(不确定它到底是什么)来纠正这个问题r_d_list
。此解决方案效果很好,但可能会很慢df
很大,因为据我个人的理解,它需要在整个数据帧中搜索每对参数(region
和distrcit
),所以我想出了一个修改版本,它根据数据将数据分成块region
,distrcit
然后在每个块中搜索,而不是整个数据帧(S3)。对我来说,这个解决方案在运行时间方面将性能提高了 20%,代码如下:
region = df['region'].unique()
chunk_numbers = 3
chunk_region = math.ceil(len(region) / chunk_numbers)
chunks = list()
r_d_list = list()
row_count = 0
for i in range(chunk_numbers):
print(i)
if i < chunk_numbers-1:
regions = region[(i*chunk_region):((i+1)*chunk_region)]
temp = df.loc[df['region'].isin(regions.tolist())]
chunks.append(temp)
r_d_list.append(list(itertools.product(regions,temp['district'].unique())))
del temp
else:
regions = region[(i * chunk_region):len(region)]
temp = df.loc[df['region'].isin(regions.tolist())]
chunks.append(temp)
r_d_list.append(list(itertools.product(regions,temp['district'].unique())))
del temp
row_count = row_count + len(chunks[i])
print(row_count)
print(df)
在and之间添加这个def b_a()
,记得注释掉r_d_list = ...
之前的if __name__ == '__main__'
。
感谢这个精彩的社区,我现在有了一个可行的解决方案,我更新了我的问题,为将来可能遇到同样问题的人提供一些材料,并更好地制定问题以获得更好的解决方案。
解决方案
我认为这里有改进的空间。我建议你在里面定义一个函数groupby
import os
import pandas as pd
import numpy as np
import dask.dataframe as dd
N = 30_000
# Now the example is reproducible
np.random.seed(0)
df = pd.DataFrame({'value': np.random.randint(0, 10, size=N),
'district': (['upper'] * 5 + ['down'] * 5) * 3000,
'region': ['A'] * 10_000 + ['B'] * 10_000 + ['C'] * 10_000,
'row_id': np.arange(N)})
以下函数返回给定组中的每一count_a
行count_b
def fun(vec):
out = []
for i, v in enumerate(vec):
a = vec[:i] + vec[i+1:]
count_a = np.isin(a, [v-2, 2]).sum()
count_b = np.isin(a, [v, v+2]).sum()
out.append([count_a, count_b])
return out
熊猫
%%time
df[["count_a", "count_b"]] = df.groupby(["district", "region"])["value"]\
.apply(lambda x: fun(x))\
.explode().apply(pd.Series)\
.reset_index(drop=True)
CPU times: user 22.6 s, sys: 174 ms, total: 22.8 s
Wall time: 22.8 s
达斯克
现在您需要再次创建df
,然后才能使用dask
. 这是我想到的第一件事。肯定有更好/更快的方法。
ddf = dd.from_pandas(df, npartitions=os.cpu_count())
df[["count_a", "count_b"]] = ddf.groupby(["district", "region"])["value"]\
.apply(lambda x: fun(x.tolist()),
meta=('x', 'f8'))\
.compute(scheduler='processes')\
.explode().apply(pd.Series)\
.reset_index(drop=True)
CPU times: user 6.92 s, sys: 114 ms, total: 7.04 s
Wall time: 13.4 s
多处理
在这种情况下,您需要再次创建df
. 这里的诀窍是拆分为df
一个s 列表。lst
df
import multiprocessing as mp
def parallelize(fun, vec, cores):
with mp.Pool(cores) as p:
res = p.map(fun, vec)
return res
def par_fun(d):
d = d.reset_index(drop=True)
o = pd.DataFrame(fun(d["value"].tolist()),
columns=["count_a", "count_b"])
return pd.concat([d,o], axis=1)
%%time
lst = [l[1] for l in list(df.groupby(["district", "region"]))]
out = parallelize(par_fun, lst, os.cpu_count())
out = pd.concat(out, ignore_index=True)
CPU times: user 152 ms, sys: 49.7 ms, total: 202 ms
Wall time: 5 s
最终,您可以fun
使用numba
.
推荐阅读
- python - 为什么在相同数据上训练准确率为 99%,而预测准确率为 81%?
- c++ - __builtin_bswap16 是否可用于签名短片?
- asp.net-core - 如何在 url asp core 中隐藏默认语言
- flutter - 我收到 Range 错误,例如 Invalid value: Not in range 16..17, inclusive: 19Error: RangeError (end):
- excel - 如何以 XML 格式声明电子表格的 codeName?
- haskell - 我可以表达一个子类约束吗?
- darknet - 暗网模型到 onnx
- python - 设置验证数据
- kotlin - IDE 中的 Kotlin“无需强制转换”
- modelica - OpenModelica:FMU 导出的第二级输入