首页 > 解决方案 > numpy数组的快速条件重叠窗口(框架)

问题描述

我有一个庞大的 numpy 数组(一维)列表,它们是不同事件的时间序列。每个点都有一个标签,我想根据其标签对 numpy 数组进行窗口化。我的标签是 0、1 和 2。每个窗口都有固定大小 M。

每个窗口的标签将是窗口中可用的最大标签。因此,如果一个窗口由 0 和 1 个标记的数据点组成,则整个窗口的标签将为 1。

但问题是,窗口不是标签不可知论的。由于类不平衡,我只想在标签 1 和 2 的情况下进行重叠窗口。

到目前为止,我已经编写了这段代码:

# conditional framing
data = []
start_cursor = 0
while start_cursor < arr.size:
  end_cursor = start_cursor + window_size
  data.append(
    {
      "frame": arr[start_cursor:end_cursor],
      "label": y[start_cursor:end_cursor].max(),
    }
  )
  start_cursor = end_cursor
  if np.any(y[start_cursor, end_cursor] != 0):
    start_cursor = start_cursor - overlap_size        

但这显然太冗长而且效率很低,特别是因为我将在我庞大的单独数组列表中调用这个 while 循环。

编辑:更多地解释问题。想象一下,你要对一个固定长度 M 的信号进行窗口化。如果窗口中只存在 0 个标签点,则相邻窗口之间不会有重叠。但如果存在标签 1 和 2,则两个信号之间会有重叠,百分比为 p%。

标签: pythonarraysnumpysignal-processingwindowing

解决方案


我认为这可以满足您的要求。检查的可视化效果不是很好,但它可以帮助您了解窗口是如何工作的。希望我正确理解了您的问题,这就是您要尝试做的。只要时间序列中有 1 或 2(而不是 0),窗口就会向前移动整个窗口长度的一部分(这里是 50%)。 开窗方法

要检查如何执行此操作,请从示例时间序列开始:

import matplotlib.pylab as plt
import numpy as np

N = 5000 # time series length

# create some sort of data set to work with
x = np.zeros(N)
# add a few 1s and 2s to the list (though really they are the same for the windowing)
y = np.random.random(N)
x[y < 0.01] = 1
x[y < 0.005] = 2

# assign a window length
M = 50 # window length
overlap = 0.5 # assume 50% overlap
M_overlap = int(M * (1-overlap))

我的方法是对您的时间序列感兴趣的窗口求和。如果 sum ==0,则窗口之间没有重叠,如果是>0则存在重叠。那么,问题就变成了我们应该如何有效地计算这些总和?我比较了两种方法。第一个是简单地遍历时间序列,第二个是使用卷积(这要快得多)。对于第一个,我还探索了求和后评估窗口大小的不同方法。

求和(慢版)

def window_sum1():
    # start of windows in list windows
    windows = [0,]
    while windows[-1] + M < N:
        check = sum(x[windows[-1]:windows[-1]+M]) == 0
        windows.append(windows[-1] + M_overlap + (M - M_overlap) * check)
        if windows[-1] + M > N:
            windows.pop()
            break
    # plotting stuff for checking
    return(windows)
Niter = 10**4
print(timeit.timeit(window_sum1, number = Niter))
# 29.201083058

所以这种方法在大约 30 秒内经历了 10,000 个长度为 5000 的时间序列。但是windows.append(windows[-1] + M_overlap + (M - M_overlap) * check)可以在 if 语句中简化该行。

求和(快速版,比慢速版快 33%)

def window_sum2():
    # start of windows in list windows
    windows = [0,]
    while windows[-1] + M < N:
        check = sum(x[windows[-1]:windows[-1]+M]) == 0
        if check:
            windows.append(windows[-1] + M)
        else:
            windows.append(windows[-1] + M_overlap)
        if windows[-1] + M > N:
            windows.pop()
            break
    # plotting stuff for checking
    return(windows)
print(timeit.timeit(window_sum2, number = Niter))
# 20.456240447000003

我们看到 if 语句的时间减少了 1/3。

卷积(比快速求和快 85%)

通过使用numpy.convolve将时间序列与感兴趣的窗口进行卷积,我们可以使用信号处理来获得更快的速度。(免责声明:我从这个问题的公认答案中得到了这个想法。)当然,从上面采用更快的窗口大小评估也是有意义的。

def window_conv():
    a = np.convolve(x,np.ones(M,dtype=int),'valid')
    windows = [0,]
    while windows[-1] + M < N:
        if a[windows[-1]]:
            windows.append(windows[-1] + M_overlap)
        else:
            windows.append(windows[-1] + M)
        if windows[-1] + M > N:
            windows.pop()
            break
    return(windows)
print(timeit.timeit(window_conv, number = Niter))
#3.3695770570000008

滑动窗口

我要补充的最后一件事是,如该问题的其中一条评论所示,截至目前,numpy 1.20有一个名为slip_window_view的函数。我仍然在numpy 1.19运行并且无法测试它是否比卷积更快。


推荐阅读