首页 > 解决方案 > 如何改进大数据的矢量化滑动窗口?

问题描述

我需要在具有 600 万个时间步长和每个时间步长 8 个特征的时间序列上使用 python 中的滑动窗口。我使用矢量化版本和带有 for 循环的版本创建了一组滑动窗口。for 循环要快得多。我已经强调了矢量化版本中耗时的步骤。有没有加快矢量化版本的好方法?

这是矢量化版本:

def vectorized_window(T, l: int, stride=1, start_idxs=None, output_type="data"):
    """Takes a time series, T, and breakes it into subsequences of length l.

    This is a vectorized version of window creation. It should run faster because it uses
    matrix operations but for very large data, this is not true because the indexing operation
    at the end is slow.

    Args:
        T: A np.ndarray of shape (|T|, features) where |T| is the number of time steps.
        l: An int designating the length of the window.
        stride: The number of time steps to move the window forward by. Default is
            1 time step.
        start_idxs: A ndarray or None (default). If start_idxs is specified, these will be used as the start
            indices for each window. stride will be ignored. Default of None will
            sequentially slide the window by stride steps. Shape should be (num of indices,)
        output_type: "data" or "idxs". The default of "data" will compute and return the full window (ndarray)
            with the actual data values for each time step. If "idxs" is specified, it will return a ndarray
            of shape (num windows, 2) where windows[:,0] are the start indices and windows[:,1] are the end indices.

    Returns:
        windows: a list of ndarrays that represent windows, with length l, of the time series. The shape is
            either (num windows, l, num features) or (num windows, 2) depending on output_type.
    """
    window_idxs = np.expand_dims(np.arange(l), 0)
    if output_type != "data":
        window_idxs = window_idxs[[0, -1]]
    if start_idxs is None:
        start_idxs = np.expand_dims(np.arange(T.shape[0]-l, step=stride), 0).T
    else:
        start_idxs = np.expand_dims(start_idxs, 0).T
    
    if output_type != "data":
        windows = window_idxs + start_idxs
    else:
        sub_windows = (window_idxs + start_idxs)
        windows = T[sub_windows] # This is the slow step
    return windows

这是带有 for 循环的版本:

def create_window(T, l: int, stride=1):
    """Takes a time series, T, and breakes it into subsequences of length l.

    Args:
        T: A list or np.ndarray representing a univariate or multivariate time series.
            If it is a multivarite time series, it must be a numpy array of shape
            (time steps, features). If features is in axis 0, this will not work.
        l: An int designating the length of the window.
        stride: The number of time steps to move the window forward by. Default is
            1 time step.

    Returns:
        windows: a list of ndarrays that represent windows, with length l, of the time series.
    """
    if "list" in str(type(T)):
        T = np.asarray(T)

    n_T= T.shape[0]
    windows = []

    for i in range(0, n_T - l, stride):
        window = T[i:i+l]
        windows.append(window)
    
    return windows

这两个版本做的事情并不完全相同。如果 output_type 不是“数据”,矢量化版本还将返回每个子序列的边界索引。但是,这种差异不会显着影响整体速度。

非常感谢任何优化此代码的建议!

标签: pythonalgorithmperformanceoptimizationvectorization

解决方案


我建议你使用tsflex,这个包有一个非常有效的窗口步幅特征提取

你可以很方便地计算你的 8 个特征,例如,看这个例子

import pandas as pd; import numpy as np; import scipy.stats as ss
from tsflex.features import MultipleFeatureDescriptors, FeatureCollection

# 1. -------- Get your time-indexed data --------
url = "https://github.com/predict-idlab/tsflex/raw/main/examples/data/empatica/"
# Contains 1 column; ["TMP"] - 4 Hz sampling rate
data_tmp = pd.read_parquet(url+"tmp.parquet").set_index("timestamp")
# Contains 3 columns; ["ACC_x", "ACC_y", "ACC_z"] - 32 Hz sampling rate
data_acc = pd.read_parquet(url+"acc.parquet").set_index("timestamp")

# 2. -------- Construct your feature collection --------
fc = FeatureCollection(
    MultipleFeatureDescriptors(
          functions=[np.min, np.max, np.mean, np.std, np.median, ss.skew, ss.kurtosis],
          series_names=["TMP", "ACC_x", "ACC_y"], # Use 3 multimodal signals 
          windows=["5min", "7.5min"],  # Use 5 minutes and 7.5 minutes 
          strides="2.5min",  # With steps of 2.5 minutes
    )
)

# 3. -------- Calculate features --------
fc.calculate(data=[data_tmp, data_acc])

注意:tsflex 要求您的数据具有时间索引,并且窗口和步幅(= 步长)参数是基于时间的(例如,“1min”、“5s”、“3h”)。

您可以在此处查看 tsflex 的其他示例。

免责声明:这个库是由我和一些同事创建的。


推荐阅读