首页 > 解决方案 > pyspark 对时间序列数据的高性能滚动/窗口聚合

问题描述

基本问题

我有一个大约 100 亿行的数据集。我正在寻找在四个不同的时间窗口(3 天、7 天、14 天、21 天)内计算滚动/窗口聚合/指标(总和、平均值、最小值、最大值、标准差)的最高效方法。

Spark/AWS EMR 规格

spark 版本:2.4.4
ec2 实例类型:r5.24xlarge
核心 ec2 实例:10
num pyspark 分区:600

概述

我阅读了一堆 SO 帖子,这些帖子解决了计算滚动统计数据的机制或如何使 Window 函数更快。但是,没有一篇文章以解决我的问题的方式结合了这两个概念。我在下面展示了一些可以满足我需求的选项,但我需要它们在我的真实数据集上运行得更快,所以我正在寻找更快/更好的建议。

我的数据集结构如下,但有大约 100 亿行:

+--------------------------+----+-----+
|date                      |name|value|
+--------------------------+----+-----+
|2020-12-20 17:45:19.536796|1   |5    |
|2020-12-21 17:45:19.53683 |1   |105  |
|2020-12-22 17:45:19.536846|1   |205  |
|2020-12-23 17:45:19.536861|1   |305  |
|2020-12-24 17:45:19.536875|1   |405  |
|2020-12-25 17:45:19.536891|1   |505  |
|2020-12-26 17:45:19.536906|1   |605  |
|2020-12-20 17:45:19.536796|2   |10   |
|2020-12-21 17:45:19.53683 |2   |110  |
|2020-12-22 17:45:19.536846|2   |210  |
|2020-12-23 17:45:19.536861|2   |310  |
|2020-12-24 17:45:19.536875|2   |410  |
|2020-12-25 17:45:19.536891|2   |510  |
|2020-12-26 17:45:19.536906|2   |610  |
|2020-12-20 17:45:19.536796|3   |15   |
|2020-12-21 17:45:19.53683 |3   |115  |
|2020-12-22 17:45:19.536846|3   |215  |

我需要我的数据集如下所示。注意:显示了 7 天窗口的窗口统计信息,但我还需要其他三个窗口。

+--------------------------+----+-----+----+-----+---+---+------------------+
|date                      |name|value|sum |mean |min|max|stddev            |
+--------------------------+----+-----+----+-----+---+---+------------------+
|2020-12-20 17:45:19.536796|1   |5    |5   |5.0  |5  |5  |NaN               |
|2020-12-21 17:45:19.53683 |1   |105  |110 |55.0 |5  |105|70.71067811865476 |
|2020-12-22 17:45:19.536846|1   |205  |315 |105.0|5  |205|100.0             |
|2020-12-23 17:45:19.536861|1   |305  |620 |155.0|5  |305|129.09944487358058|
|2020-12-24 17:45:19.536875|1   |405  |1025|205.0|5  |405|158.11388300841898|
|2020-12-25 17:45:19.536891|1   |505  |1530|255.0|5  |505|187.08286933869707|
|2020-12-26 17:45:19.536906|1   |605  |2135|305.0|5  |605|216.02468994692867|
|2020-12-20 17:45:19.536796|2   |10   |10  |10.0 |10 |10 |NaN               |
|2020-12-21 17:45:19.53683 |2   |110  |120 |60.0 |10 |110|70.71067811865476 |
|2020-12-22 17:45:19.536846|2   |210  |330 |110.0|10 |210|100.0             |
|2020-12-23 17:45:19.536861|2   |310  |640 |160.0|10 |310|129.09944487358058|
|2020-12-24 17:45:19.536875|2   |410  |1050|210.0|10 |410|158.11388300841898|
|2020-12-25 17:45:19.536891|2   |510  |1560|260.0|10 |510|187.08286933869707|
|2020-12-26 17:45:19.536906|2   |610  |2170|310.0|10 |610|216.02468994692867|
|2020-12-20 17:45:19.536796|3   |15   |15  |15.0 |15 |15 |NaN               |
|2020-12-21 17:45:19.53683 |3   |115  |130 |65.0 |15 |115|70.71067811865476 |
|2020-12-22 17:45:19.536846|3   |215  |345 |115.0|15 |215|100.0             |

细节

为了便于阅读,我将在这些示例中只做一个窗口。我尝试过的事情:

  1. 基本Window().over()语法
  2. 将窗口值转换为数组列并使用高阶函数
  3. 火花 SQL

设置

import datetime

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType
import pandas as pd
import numpy as np

spark = SparkSession.builder.appName('example').getOrCreate()

# create spark dataframe
n = 7
names = [1, 2, 3]
date_list = [datetime.datetime.today() - datetime.timedelta(days=(n-x)) for x in range(n)]
values = [x*100 for x in range(n)]

rows = []
for name in names:
    for d, v in zip(date_list, values):
        rows.append(
            {
                "name": name,
                "date": d,
                "value": v+(5*name)
            }
        )
df = spark.createDataFrame(data=rows)

# setup window
window_days = 7
window = (
    Window
    .partitionBy(F.col("name"))
    .orderBy(F.col("date").cast("timestamp").cast("long"))
    .rangeBetween(-window_days * 60 * 60 * 24 + 1, Window.currentRow)
)

1. 基本

这会创建多个窗口规范,如此处所示因此以串行方式执行,并且在大型数据集上运行非常缓慢

status_quo = (df
    .withColumn("sum",F.sum(F.col("value")).over(window))
    .withColumn("mean",F.avg(F.col("value")).over(window))
    .withColumn("min",F.min(F.col("value")).over(window))
    .withColumn("max",F.max(F.col("value")).over(window))
    .withColumn("stddev",F.stddev(F.col("value")).over(window))
)
status_quo.show()
status_quo.explain()

2. 数组列 --> 高阶函数

Per this answer似乎创建了更少的窗口规范,但aggregate()函数语法对我来说毫无意义,我不知道如何stddev使用高阶函数编写,并且在小型测试中性能似乎并没有好多少

@F.udf(returnType=FloatType())
def array_stddev(row_value):
    """
    temporary function since I don't know how to write higher order standard deviation
    """
    return np.std(row_value, dtype=float).tolist()

# 1. collect window into array column
# 2. use higher order (array) functions to calculate aggregations over array (window values)
# Question: how to write standard deviation in aggregate()
hof_example = (
    df
    .withColumn("value_array", F.collect_list(F.col("value")).over(window))
    .withColumn("sum_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
    .withColumn("mean_example", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
    .withColumn("max_example", F.array_max(F.col("value_array")))
    .withColumn("min_example", F.array_min(F.col("value_array")))
    .withColumn("std_example", array_stddev(F.col("value_array")))
)

3. 火花 SQL

这似乎是简单测试中最快的。唯一(次要)问题是我的代码库的其余部分使用 DataFrame API。在小型测试中似乎更快,但未在完整数据集上进行测试。

df.createOrReplaceTempView("df")
sql_example = spark.sql(
    """
    SELECT 
        *
        , sum(value)
        OVER (
            PARTITION BY name
            ORDER BY CAST(date AS timestamp) 
            RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
        ) AS sum
        , mean(value)
        OVER (
            PARTITION BY name
            ORDER BY CAST(date AS timestamp) 
            RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
        ) AS mean
        , min(value)
        OVER (
            PARTITION BY name
            ORDER BY CAST(date AS timestamp) 
            RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
        ) AS min
        , max(value)
        OVER (
            PARTITION BY name
            ORDER BY CAST(date AS timestamp) 
            RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
        ) AS max
        , stddev(value)
        OVER (
            PARTITION BY name
            ORDER BY CAST(date AS timestamp) 
            RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
        ) AS stddev
    FROM df"""
)

标签: apache-sparkpysparkapache-spark-sqlwindow-functionsrolling-computation

解决方案


注意:我暂时将其标记为已接受的答案。如果有人找到更快/更好的方法,请通知我,我会切换它!

编辑澄清:此处显示的计算假设输入数据帧通过日级滚动计算预处理到日级

在我发布问题后,我在我的真实数据集上测试了几个不同的选项(并从同事那里得到了一些输入),我相信最快的方法(对于大型数据集)使用pyspark.sql.functions.window()withgroupby().agg而不是pyspark.sql.window.Window().

可以在这里找到类似的答案

完成这项工作的步骤是:

  1. name按和排序数据帧date(在示例数据帧中)
  2. .persist()数据框
  3. 使用每个所需的窗口计算分组数据帧F.window()并返回df

最好/最简单的方法是在 Spark GUI 事物中的 SQL 图表上看到这一点。使用Window()时,SQL 执行是完全顺序的。但是,当F.window()使用时,图表显示并行化!注意:在小型数据集上Window()似乎仍然更快。

在我使用 7 天窗口上的真实数据进行的测试中,Window()它比F.window(). 唯一的缺点是F.window()使用起来有点不方便。我在下面显示了一些代码和屏幕截图以供参考

找到最快的解决方案(F.window()groupby.agg()

# this turned out to be super important for tricking spark into parallelizing things
df = df.orderBy("name", "date")
df.persist()

fwindow7 = F.window(
    F.col("date"),
    windowDuration="7 days",
    slideDuration="1 days",
).alias("window")

gb7 = (
    df
    .groupBy(F.col("name"), fwindow7)
    .agg(
        F.sum(F.col("value")).alias("sum7"),
        F.avg(F.col("value")).alias("mean7"),
        F.min(F.col("value")).alias("min7"),
        F.max(F.col("value")).alias("max7"),
        F.stddev(F.col("value")).alias("stddev7"),
        F.count(F.col("value")).alias("cnt7")
    )
    .withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
    .drop("window")
)
window_function_example = df.join(gb7, ["name", "date"], how="left")


fwindow14 = F.window(
    F.col("date"),
    windowDuration="14 days",
    slideDuration="1 days",
).alias("window")

gb14 = (
    df
    .groupBy(F.col("name"), fwindow14)
    .agg(
        F.sum(F.col("value")).alias("sum14"),
        F.avg(F.col("value")).alias("mean14"),
        F.min(F.col("value")).alias("min14"),
        F.max(F.col("value")).alias("max14"),
        F.stddev(F.col("value")).alias("stddev14"),
        F.count(F.col("value")).alias("cnt14")
    )
    .withColumn("date", F.date_sub(F.col("window.end").cast("date"), 1))
    .drop("window")
)
window_function_example = window_function_example.join(gb14, ["name", "date"], how="left")
window_function_example.orderBy("name", "date").show(truncate=True)

SQL图

通过...分组

原始问题中的选项 2(应用于 的高阶函数Window()

window7 = (
    Window
    .partitionBy(F.col("name"))
    .orderBy(F.col("date").cast("timestamp").cast("long"))
    .rangeBetween(-7 * 60 * 60 * 24 + 1, Window.currentRow)
)
window14 = (
    Window
    .partitionBy(F.col("name"))
    .orderBy(F.col("date").cast("timestamp").cast("long"))
    .rangeBetween(-14 * 60 * 60 * 24 + 1, Window.currentRow)
)
hof_example = (
    df
    .withColumn("value_array", F.collect_list(F.col("value")).over(window7))
    .withColumn("sum7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
    .withColumn("mean7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
    .withColumn("max7", F.array_max(F.col("value_array")))
    .withColumn("min7", F.array_min(F.col("value_array")))
    .withColumn("std7", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean7)*(x - mean7), acc -> sqrt(acc / (size(value_array) - 1)))'))
    .withColumn("count7", F.size(F.col("value_array")))
    .drop("value_array")
)
hof_example = (
    hof_example
    .withColumn("value_array", F.collect_list(F.col("value")).over(window14))
    .withColumn("sum14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x)'))
    .withColumn("mean14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + x, acc -> acc / size(value_array))'))
    .withColumn("max14", F.array_max(F.col("value_array")))
    .withColumn("min14", F.array_min(F.col("value_array")))
    .withColumn("std14", F.expr('AGGREGATE(value_array, DOUBLE(0), (acc, x) -> acc + (x - mean14)*(x - mean14), acc -> sqrt(acc / (size(value_array) - 1)))'))
    .withColumn("count14", F.size(F.col("value_array")))
    .drop("value_array")
)

hof_example.show(truncate=True)

SQL 图片段

高阶函数


推荐阅读