首页 > 解决方案 > Python:百万条记录的缓慢处理

问题描述

我想处理具有numpy.array如下行的数据(包含在稍后加载到实例中的磁盘文件中):

1 3 a
1 4 b
1 5 a
2 6 b

其中第一列是开始时间,第二列是结束时间,第三列是 id。我想处理这些数据,以便在每个开始时间识别一些 id,例如:

1  2
2  2
3  2
4  2
5  2
6  1

其中第一列是开始时间,第二列是 ID 数

我编写了以下代码将其处理为:

j=[]                                          # a list of ids
for i in range( len( dataset1 ) ):
    indices = numpy.argwhere( ( dataset1[i,0] >= dataset[:,0] )
                            & ( dataset1[i,0] <= dataset[:,1] )
                              )
    j.append( len( set( dataset[indices[:,0],2] ) ) )

其中:
-dataset1第一列为 1,2,3,4,5,6 个时间戳,并且
-dataset有三列:开始时间、结束时间和 id。

我必须处理中给出的大约 9 亿行dataset1。这是非常缓慢的。

我试图将其并行化为:

inputs = range( len( dataset1 ) )

def processInput( b ):
    indices = numpy.argwhere( ( b >= dataset[:,0] )
                            & ( b <= dataset[:,1] )
                              )
    return( len( set( dataset[indices[:,0],2] ) ) )

num_cores = 10

results = Parallel( n_jobs = num_cores )( delayed( processInput )( dataset[j,0] ) for j in inputs )

但这仍然很慢。我还有 10 个可用内核,但随后磁盘成为瓶颈。

有什么方法可以快速处理这些数据?

标签: pythonparallel-processing

解决方案


有什么方法可以快速处理这些数据?

就在这里。

(由于 GIL 锁定,Python线程在这里没有一点帮助(在纯顺序处理中重新序列化所有工作[SERIAL],增加额外开销以增加寻找 GIL 锁定获取的强度),Python进程-基于并行性是昂贵的,并且会复制所有 RAM 数据,包括 itnerpreter 多次请求(磁盘块,因为它交换 RAM,不是因为顺利读取通道而是< 1E9文件中的一些数据行,当然,除非你有多[TB] RAM 设备一次保存所有 python 进程/数据副本))


第 1 步:
将有效的 DATA 流设置为恰到好处的高效处理

id准备数据文件以最适合您对-s的进一步微不足道的计数

sort -k1,1       \
     -k3          \
     --parallel=19 \
     --output=dataset1_SORTED_DATA.txt < dataset1_data_file.txt

sort --parallel=19 \
     --output=dataset_SORTED_GATEs.txt < dataset_T1_T2_GATEs_data_file.txt

第2步:
按照从第二个文件读取的“门控”顺序处理排序文件

接下来的第一个dataset1_SORTED_DATA.txt文件简单地处理(计算与条件匹配的连续的、顺序的行块),只按顺序读取一次,如使用第二个文件< T1_start, T2_end >中准备的同样排序的门所指示的那样。dataset_SORTED_GATEs.txt

这几乎是一个流处理,是平滑的,只使用简单的行计数,满足...SORTED_GATEs.txt数据文件中的两个条件,其中< T1, T2 >-gates 再次单调地变得越来越大,因此第一个...SORTED_DATA.txt文件被处理在一个文件中,顺利通过id,按要求只计算-s。


推荐阅读