首页 > 解决方案 > Spark - 如何将窗口中的行与步幅值连接起来

问题描述

我有如下数据框(时间序列数据):

    value category
    a1    c1
    a2    c1
    a3    c1
    a4    c1
    a5    c1
    a6    c1
    a7    c1
    a8    c1
    a1    c2
    a2    c2
    a3    c2
    a4    c2
    a5    c2
    a6    c2
    a7    c2
    a8    c2

我想要做的是使用窗口滑动,因此窗口大小 = 4,跨步 = 2,这意味着窗口包含 4 行,我们将窗口移动 2 行。预期的结果应该是这样的:

    window value      category
    [a1, a2, a3, a4]    c1
    [a3, a4, a5, a6]    c1
    [a5, a6, a7, a8]    c1
    [a1, a2, a3, a4]    c2
    [a3, a4, a5, a6]    c2
    [a5, a6, a7, a8]    c2

我尝试过使用窗口功能。但是,据我所知,该窗口将遍历我的 Dataframe 中的所有行。示例源代码为:

# define the window spec with 4 rows following        
windowSpec = Window.partitionBy(col("category").orderBy(col("value ")).rowsBetween(0, 3)  
# get the window data
window_data = df.withColumn('window_data',collect_list(col("value")).over(windowSpec))

所以结果会是这样的:

    window_data      category
    [a1, a2, a3, a4]    c1
    [a2, a3, a4, a5]    c1
    [a3, a4, a5, a6]    c1
    [a4, a5, a6, a7]    c1
    ...

更新:实际上,我们可以为 Dataframe 的每一行连接一个窗口中的所有行,然后只过滤某些特定位置的一些行。但这似乎成本很高,因为我们必须对 Dataframe 中的所有行进行两次迭代,并且我们必须连接我们稍后将忽略的那些。直觉上,我认为我们可以有一个更优化的选择。

你们能推荐任何方法来获得我想要的结果吗?

提前致谢 :-)

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


长,

这是一个scala代码(非常类似于python):

val window = Window.partitionBy(col("category")).orderBy("value")
val window_data = df.withColumn("window_data", collect_list(col("value")).over(window.rowsBetween(0,3)))
    .withColumn("rownum", row_number().over(window))
    .where(pmod($"rownum", lit(2))===1 && size(col("window_data")) === 4)
    .drop("rownum")

window_data.show(false)

输出 :

+-----+--------+----------------+
|value|category|window_data     |
+-----+--------+----------------+
|a1   |c1      |[a1, a2, a3, a4]|
|a3   |c1      |[a3, a4, a5, a6]|
|a5   |c1      |[a5, a6, a7, a8]|
|a1   |c2      |[a1, a2, a3, a4]|
|a3   |c2      |[a3, a4, a5, a6]|
|a5   |c2      |[a5, a6, a7, a8]|
+-----+--------+----------------+

这个想法是通过与用于该window_data字段的窗口相同的窗口计算每一行的位置,并且只保留奇数行。此外,我们将删除在 window_data 中找不到 4 个元素的行(而不是添加空值)。

编辑:这是查询计划。它显示单个窗口用于rownumwindow_data计算(没有额外的随机/排序)。

== Physical Plan ==
*(2) Project [value#5, category#6, window_data#10]
+- *(2) Filter ((isnotnull(rownum#15) && (pmod(rownum#15, 2) = 1)) && (size(window_data#10) = 4))
   +- Window [collect_list(value#5, 0, 0) windowspecdefinition(category#6, value#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 3)) AS window_data#10, row_number() windowspecdefinition(category#6, value#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rownum#15], [category#6], [value#5 ASC NULLS FIRST]
      +- *(1) Sort [category#6 ASC NULLS FIRST, value#5 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(category#6, 200)
            +- LocalTableScan [value#5, category#6]

推荐阅读