首页 > 解决方案 > Python:在脚本中实现多线程标志选项

问题描述

我正在编写一个简单的脚本,它将两个TSV文件(file_a.tsvfile_b.tsv)作为输入并解析 的所有值,file_a.tsv以检查它们是否包含在file_b.tsv. 这是脚本:

import os
import sys
import argparse
import pandas as pd

# Defining the main function:
def myfunc() -> tuple:
    ap = argparse.ArgumentParser()
    ap.add_argument("-a", "--file_a", help="path to file_a")
    ap.add_argument("-b", "--file_b", help="path to file_b")
    return ap.parse_args()

args = myfunc()

file_a = args.file_a
file_b = args.file_b

# Initialising of file_a and file_b as dataframes
with open(file_a, 'r') as a:
    file_a_df = pd.read_table(a, header=None)

with open(file_b, 'r') as b:
    file_b_df = pd.read_table(b, header=None)
    file_b_df.columns = ["seqname", "source", "feature", "start", "end", "score", "strand", "frame", "attribute"]

# Here's two list to be used 
contained = []
not_contained = []

# Defining a function for the file_a parsing
def fileparser(val):
    for index, row in file_b_df.iterrows():
        if val >= row['start'] and val <= row['end']:
            contained.append(val)
            return True
    not_contained.append(val)

# Apply fileparser
file_a.iloc[:, 1].apply(fileparser)

print("Contained: ", len(contained))
print("Not contained: ", len(not_contained)

从终端运行它看起来像这样:

python my_script.py -a "path/to/file_a" -b "path/to/file_b"

问题来了:file_a有近 700 万个值和file_b数千个要检查的范围,所以用主线程完成这是一个相当大的过程。

我想像-p下面这样在我的管道中添加一个标志,以实现多线程选项并加快进程:

python my_script.py -p 8 -a "path/to/file_a" -b "path/to/file_b"

我知道threading可以导入库,如何将其添加到我的脚本中?谢谢你。

标签: pythonpandasmultithreadingdataframepython-multithreading

解决方案


如果您有数百万行,则最好在并行化之前先优化算法的复杂性(big-o)。在这里,您正在比较所有范围内的所有a项目,b更重要的是慢方法.apply()和手动迭代行 - 让我们也用 pandas/numpy 原语替换所有这些。

免费的好处是您不再需要自己处理并行性, numpy 会在后台为您完成。例如,请参阅«使用并行原语»部分下的本指南。

我假设出于优化目的,您N在 file_a 和M不同的间隔中有值。

1. 我们可以首先将所有b区间合并为最小数量的不同区间

>>> starts = df['start'].value_counts().sort_index()
>>> ends = df['end'].value_counts().sort_index()
>>> ends.index += 1
>>> idx = ends.index.join(starts.index, how='outer')
>>> depth = (starts.reindex(idx, fill_value=0) - ends.reindex(idx, fill_value=0)).cumsum()
>>> depth[depth.eq(0) | depth.eq(0).shift(fill_value=True)]
0      1
36     0
38     1
40     0
41     1
86     0
87     1
103    0
dtype: int64
>>> min_intervals = pd.DataFrame({
...     'start': idx[depth.eq(0).shift(fill_value=True)],
...     'end': idx[depth.eq(0)] - 1
... })
>>> min_intervals
   start  end
0      0   35
1     38   39
2     41   85
3     87  102

这里主要的微妙之处是+1-1结束界限,因为界限是包容性的,我们想要正确计算连续区间的并集,即[3, 5]并且[6, 8]应该是[3, 8]

当然,如果您知道您的间隔已经分开,那么您可以跳过所有这些并执行min_intervals = file_b_df[['start', 'end']].sort_values('start').

2. 将值与排序边界进行比较,O(N×M) -> O(N×log(M))

请注意,现在min_intervals已排序,如果我们将其堆叠,我们将按排序顺序获得边界。现在让我们添加回来+1,我们可以用它pd.Series.searchsorted()来找出应该在这个边界序列中的哪个位置插入一个数字以保持顺序:

>>> bounds = min_intervals.stack()
>>> bounds.loc[slice(None), ['end']] += 1
>>> bounds
0  start      0
   end       36
1  start     38
   end       40
2  start     41
   end       86
3  start     87
   end      103
dtype: int64
>>> bounds.searchsorted([0, 1, 35, 36, 100, 86], 'right')
array([1, 1, 1, 2, 7, 6])

如您所见,区间内的数字([0, 35] 中的 0, 1 和 35,[87, 103] 中的 100)返回奇数,不在区间 (36, 86) 中的数字返回偶数。

>>> file_a = pd.DataFrame({'val': [0, 1, 35, 36, 100, 86]})
>>> contained = pd.Series(bounds.searchsorted(file_a['val'], 'right'), index=file_a.index).mod(2).eq(1)
>>> file_a[contained]
   val
0    0
1    1
2   35
4  100
>>> file_a[~contained]
   val
3   36
5   86

如果您不想修改file_a,当然如果您愿意排序,这很有效,file_a您可以获得更快的结果(假设 N > M)。

3. 将排序后的值与区间界限进行比较,O(N×log(M)) -> O(log(N)×M)

>>> file_a.sort_values('val', inplace=True)

从那里我们现在可以使用searchsortedon file_a,例如计算包含的值的行:

>>> rows = pd.Series(file_a['val'].searchsorted(bounds, 'left'), index=bounds.index)
>>> rows
0  start    0
   end      3
1  start    4
   end      4
2  start    4
   end      4
3  start    5
   end      6
dtype: int64
>>> pd.concat([file_a.iloc[slice(*v)] for k, v in rows.groupby(level=0)])
   val
0    0
1    1
2   35
4  100

或者我们可以用与区间交集相同的方式构造一个布尔索引器:

>>> contained = rows.xs('start', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained -= rows.xs('end', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained = contained.cumsum().reindex(np.arange(len(file_a))).ffill().eq(1)
>>> contained.index = file_a.index  # Index was row numbers, use file_a’s index
>>> contained
0     True
1     True
2     True
3    False
5    False
4     True
dtype: bool
>>> file_a[contained]
   val
0    0
1    1
2   35
4  100
>>> file_a[~contained]
   val
3   36
5   86

现在,这种策略的另一个好处是我们编写的代码中从未进行过循环。因此,除了允许 numpy 使用它之外,我们不需要关心并行化。如果您真的想自己添加并行优化,那么另一个问题将会很有趣。


推荐阅读