python - Python:在脚本中实现多线程标志选项
问题描述
我正在编写一个简单的脚本,它将两个TSV文件(file_a.tsv
和file_b.tsv
)作为输入并解析 的所有值,file_a.tsv
以检查它们是否包含在file_b.tsv
. 这是脚本:
import os
import sys
import argparse
import pandas as pd
# Defining the main function:
def myfunc() -> tuple:
ap = argparse.ArgumentParser()
ap.add_argument("-a", "--file_a", help="path to file_a")
ap.add_argument("-b", "--file_b", help="path to file_b")
return ap.parse_args()
args = myfunc()
file_a = args.file_a
file_b = args.file_b
# Initialising of file_a and file_b as dataframes
with open(file_a, 'r') as a:
file_a_df = pd.read_table(a, header=None)
with open(file_b, 'r') as b:
file_b_df = pd.read_table(b, header=None)
file_b_df.columns = ["seqname", "source", "feature", "start", "end", "score", "strand", "frame", "attribute"]
# Here's two list to be used
contained = []
not_contained = []
# Defining a function for the file_a parsing
def fileparser(val):
for index, row in file_b_df.iterrows():
if val >= row['start'] and val <= row['end']:
contained.append(val)
return True
not_contained.append(val)
# Apply fileparser
file_a.iloc[:, 1].apply(fileparser)
print("Contained: ", len(contained))
print("Not contained: ", len(not_contained)
从终端运行它看起来像这样:
python my_script.py -a "path/to/file_a" -b "path/to/file_b"
问题来了:file_a
有近 700 万个值和file_b
数千个要检查的范围,所以用主线程完成这是一个相当大的过程。
我想像-p
下面这样在我的管道中添加一个标志,以实现多线程选项并加快进程:
python my_script.py -p 8 -a "path/to/file_a" -b "path/to/file_b"
我知道threading
可以导入库,如何将其添加到我的脚本中?谢谢你。
解决方案
如果您有数百万行,则最好在并行化之前先优化算法的复杂性(big-o)。在这里,您正在比较所有范围内的所有a
项目,b
更重要的是慢方法.apply()
和手动迭代行 - 让我们也用 pandas/numpy 原语替换所有这些。
免费的好处是您不再需要自己处理并行性, numpy 会在后台为您完成。例如,请参阅«使用并行原语»部分下的本指南。
我假设出于优化目的,您N
在 file_a 和M
不同的间隔中有值。
1. 我们可以首先将所有b
区间合并为最小数量的不同区间
>>> starts = df['start'].value_counts().sort_index()
>>> ends = df['end'].value_counts().sort_index()
>>> ends.index += 1
>>> idx = ends.index.join(starts.index, how='outer')
>>> depth = (starts.reindex(idx, fill_value=0) - ends.reindex(idx, fill_value=0)).cumsum()
>>> depth[depth.eq(0) | depth.eq(0).shift(fill_value=True)]
0 1
36 0
38 1
40 0
41 1
86 0
87 1
103 0
dtype: int64
>>> min_intervals = pd.DataFrame({
... 'start': idx[depth.eq(0).shift(fill_value=True)],
... 'end': idx[depth.eq(0)] - 1
... })
>>> min_intervals
start end
0 0 35
1 38 39
2 41 85
3 87 102
这里主要的微妙之处是+1
与-1
结束界限,因为界限是包容性的,我们想要正确计算连续区间的并集,即[3, 5]
并且[6, 8]
应该是[3, 8]
当然,如果您知道您的间隔已经分开,那么您可以跳过所有这些并执行min_intervals = file_b_df[['start', 'end']].sort_values('start')
.
2. 将值与排序边界进行比较,O(N×M) -> O(N×log(M))
请注意,现在min_intervals
已排序,如果我们将其堆叠,我们将按排序顺序获得边界。现在让我们添加回来+1
,我们可以用它pd.Series.searchsorted()
来找出应该在这个边界序列中的哪个位置插入一个数字以保持顺序:
>>> bounds = min_intervals.stack()
>>> bounds.loc[slice(None), ['end']] += 1
>>> bounds
0 start 0
end 36
1 start 38
end 40
2 start 41
end 86
3 start 87
end 103
dtype: int64
>>> bounds.searchsorted([0, 1, 35, 36, 100, 86], 'right')
array([1, 1, 1, 2, 7, 6])
如您所见,区间内的数字([0, 35] 中的 0, 1 和 35,[87, 103] 中的 100)返回奇数,不在区间 (36, 86) 中的数字返回偶数。
>>> file_a = pd.DataFrame({'val': [0, 1, 35, 36, 100, 86]})
>>> contained = pd.Series(bounds.searchsorted(file_a['val'], 'right'), index=file_a.index).mod(2).eq(1)
>>> file_a[contained]
val
0 0
1 1
2 35
4 100
>>> file_a[~contained]
val
3 36
5 86
如果您不想修改file_a
,当然如果您愿意排序,这很有效,file_a
您可以获得更快的结果(假设 N > M)。
3. 将排序后的值与区间界限进行比较,O(N×log(M)) -> O(log(N)×M)
>>> file_a.sort_values('val', inplace=True)
从那里我们现在可以使用searchsorted
on file_a
,例如计算包含的值的行:
>>> rows = pd.Series(file_a['val'].searchsorted(bounds, 'left'), index=bounds.index)
>>> rows
0 start 0
end 3
1 start 4
end 4
2 start 4
end 4
3 start 5
end 6
dtype: int64
>>> pd.concat([file_a.iloc[slice(*v)] for k, v in rows.groupby(level=0)])
val
0 0
1 1
2 35
4 100
或者我们可以用与区间交集相同的方式构造一个布尔索引器:
>>> contained = rows.xs('start', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained -= rows.xs('end', 'index', 1).value_counts().reindex(rows.unique(), fill_value=0)
>>> contained = contained.cumsum().reindex(np.arange(len(file_a))).ffill().eq(1)
>>> contained.index = file_a.index # Index was row numbers, use file_a’s index
>>> contained
0 True
1 True
2 True
3 False
5 False
4 True
dtype: bool
>>> file_a[contained]
val
0 0
1 1
2 35
4 100
>>> file_a[~contained]
val
3 36
5 86
现在,这种策略的另一个好处是我们编写的代码中从未进行过循环。因此,除了允许 numpy 使用它之外,我们不需要关心并行化。如果您真的想自己添加并行优化,那么另一个问题将会很有趣。
推荐阅读
- javascript - 从鼠标单击事件侦听器添加对 Perl CGI 脚本的调用时出现问题
- javascript - React Native 嵌套扁平列表性能不佳
- c++ - 对象生命周期的结束与它何时停止存在之间的关系是什么?
- r - 在R中的表中合并多个保存为xls的html
- flutter - How to only change the time in DateTime?
- mysql - 在 phpmyadmin 中设置 int 类型长度无限制
- python - 使用多索引注释散点图
- github - GitHub App Web 应用程序流程:请求用户身份时的登录参数是什么?
- python - 无法设置虚拟环境或查看是否安装了 Anaconda,无法访问 bash 文件夹,“找不到命令”
- docker - 码头工人:standard_init_linux.go:211