python-3.x - numpy数组中的多个子字符串搜索具有很少的唯一值
问题描述
受这个问题的启发:假设我有一个包含多个 1Dnumpy
数组的列表,xs
我想知道有多少作为另一个更大的 1Dnumpy
数组的“子字符串”出现y
。
我们可以假设数组包含整数,并且它是某些整数的ifa
的子字符串和。b
a == b[p:q]
p
q
我提出的解决方案使用in
Pythonbytes
对象的运算符,但我认为如果xs
有很多元素它是低效的:
import numpy as np
N = 10_000 # number of arrays to search
M = 3 # "alphabet" size
K = 1_000_000 # size of the target array
xs = [np.random.randint(0, M, size=7) for _ in range(N)]
y = np.random.randint(0, M, size=K)
y_bytes = y.tobytes()
%time num_matches = sum(1 for x in xs if x.tobytes() in y_bytes)
# CPU times: user 1.03 s, sys: 17 µs, total: 1.03 s
# Wall time: 1.03 s
如果M
很大(y
任何一个中的可能值的数量xs
)很大,那么我想可以做很少的事情来加快速度。但是,对于小的,M
我想使用 trie 或类似的东西可能会有所帮助。有没有一种在 Python 中实现这一点的有效方法,可能使用numpy
/ numba
?
解决方案
对于 small M's
,我们可以根据其中的整数组合为每个标签分配xs
一个唯一标签。同样,我们可以利用卷积和缩放数组,因此,将每一个都减少xs
为一个标量。最后,我们使用匹配方法来检测和计数。
唯一的开销是从数组列表转换为数组。因此,如果列表创建本身预先优化为拥有一个数组,那么它将对最终的性能数据有很大帮助。
实现看起来像这样 -
x = np.asarray(xs) # convert to array, if not already done
s = M**np.arange(x.shape[1])
yr = np.convolve(y,s[::-1])
xr = x.dot(s)
# Final step : Match and get count
N = np.maximum(xr.max(),yr.max())+1 # or use s[-1]*M if M is small enough
l = np.zeros(N, dtype=bool)
l[yr] = True
count = l[xr].sum()
执行的替代方案Final step
替代#1:
sidx = yr.argsort()
idx = np.searchsorted(yr,xr,sorter=sidx)
idx[idx==len(yr)] = 0
count = (yr[sidx[idx]] == xr).sum()
替代#2:
from scipy.sparse import csr_matrix
ly = len(yr)
y_extent = yr.max()+1 # or use s[-1]*M if M is small enough
r = np.zeros(ly, dtype=np.uint64)
val = np.ones(ly, dtype=np.bool)
sp_mat = csr_matrix((val, (r,yr)), shape=(1,y_extent))
count = sp_mat[:,xr].sum()
替代#3:
对于更大的M
数字,我们可以使用empty
数组代替 -
l = np.empty(N, dtype=bool)
l[xr] = False
l[yr] = True
count = l[xr].sum()
进一步挖掘(numba
利用convolution
)
对主要提出的解决方案的分析表明,1D
卷积部分是耗时的部分。更进一步,我们看到1D
卷积码有一个特定的内核,本质上是几何的。这可以在O(n)
每次迭代重新使用边界元素时实现。请注意,与之前提出的内核相比,这基本上是一个倒置内核。所以,把所有这些改变,我们最终得到这样的东西 -
from numba import njit
@njit
def numba1(y, conv_out, M, L, N):
A = M**L
for i in range(1,N):
conv_out[i] = conv_out[i-1]*M + y[i+L-1] - y[i-1]*A
return conv_out
def numba_convolve(y, M, L):
N = len(y)-L+1
conv_out = np.empty(N, dtype=int)
conv_out[0] = y[:L].dot(M**np.arange(L-1,-1,-1))
return numba1(y, conv_out, M, L, N)
def intersection_count(xs, y):
x = np.asarray(xs) # convert to array, if not already done
L = x.shape[1]
s = M**np.arange(L-1,-1,-1)
xr = x.dot(s)
yr_numba = numba_convolve(y, M=M, L=L)
# Final step : Match and get count
N = s[0]*M
l = np.zeros(N, dtype=bool)
l[yr_numba] = True
count = l[xr].sum()
return count
基准测试
我们将重新使用问题中的设置。
In [42]: %%timeit
...: y_bytes = y.tobytes()
...: p = sum(1 for x in xs if x.tobytes() in y_bytes)
927 ms ± 3.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [43]: %timeit intersection_count(xs, y)
7.55 ms ± 71.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
如前所述,转换为数组可能是瓶颈。所以,让我们也为这部分计时 -
In [44]: %timeit np.asarray(xs)
3.41 ms ± 10.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
所以,数组转换部分大约45%
是总运行时间,这是一个相当大的部分。因此,在这一点上,使用 2D 数组而不是 1D 数组列表的建议变得至关重要。好处是数组数据为我们提供了矢量化能力,从而提高了整体性能。只是为了强调 2D 阵列的可用性,这里是有和没有的加速 -
In [45]: 927/7.55
Out[45]: 122.78145695364239
In [46]: 927/(7.55-3.41)
Out[46]: 223.91304347826087
推荐阅读
- dataframe - 为什么 Dask 在我的 Dataframe 中填写“foo”和 1
- javascript - 如果您只知道属性名称,您能否通过 setter 和 getter 从对象的列表中定义属性?
- python - 如何让3个线程依次打印
- c++ - 上限 time_point 到运行时定义的持续时间
- python - 使用 xml.dom.minidom 在 Python 中进行 XML 注入
- php - 过滤整个 $_POST 变量 - 这会安全吗?
- sql - 如何修复 OLEDB、SQL 中的更新和设置命令?
- amazon-web-services - 没有得到詹金斯解锁路径
- java - 如何解决:数独求解器堆栈溢出问题
- java - 如何从文本中提取单个单词和 url?