python - Spark - 如何将窗口中的行与步幅值连接起来
问题描述
我有如下数据框(时间序列数据):
value category
a1 c1
a2 c1
a3 c1
a4 c1
a5 c1
a6 c1
a7 c1
a8 c1
a1 c2
a2 c2
a3 c2
a4 c2
a5 c2
a6 c2
a7 c2
a8 c2
我想要做的是使用窗口滑动,因此窗口大小 = 4,跨步 = 2,这意味着窗口包含 4 行,我们将窗口移动 2 行。预期的结果应该是这样的:
window value category
[a1, a2, a3, a4] c1
[a3, a4, a5, a6] c1
[a5, a6, a7, a8] c1
[a1, a2, a3, a4] c2
[a3, a4, a5, a6] c2
[a5, a6, a7, a8] c2
我尝试过使用窗口功能。但是,据我所知,该窗口将遍历我的 Dataframe 中的所有行。示例源代码为:
# define the window spec with 4 rows following
windowSpec = Window.partitionBy(col("category").orderBy(col("value ")).rowsBetween(0, 3)
# get the window data
window_data = df.withColumn('window_data',collect_list(col("value")).over(windowSpec))
所以结果会是这样的:
window_data category
[a1, a2, a3, a4] c1
[a2, a3, a4, a5] c1
[a3, a4, a5, a6] c1
[a4, a5, a6, a7] c1
...
更新:实际上,我们可以为 Dataframe 的每一行连接一个窗口中的所有行,然后只过滤某些特定位置的一些行。但这似乎成本很高,因为我们必须对 Dataframe 中的所有行进行两次迭代,并且我们必须连接我们稍后将忽略的那些。直觉上,我认为我们可以有一个更优化的选择。
你们能推荐任何方法来获得我想要的结果吗?
提前致谢 :-)
解决方案
长,
这是一个scala代码(非常类似于python):
val window = Window.partitionBy(col("category")).orderBy("value")
val window_data = df.withColumn("window_data", collect_list(col("value")).over(window.rowsBetween(0,3)))
.withColumn("rownum", row_number().over(window))
.where(pmod($"rownum", lit(2))===1 && size(col("window_data")) === 4)
.drop("rownum")
window_data.show(false)
输出 :
+-----+--------+----------------+
|value|category|window_data |
+-----+--------+----------------+
|a1 |c1 |[a1, a2, a3, a4]|
|a3 |c1 |[a3, a4, a5, a6]|
|a5 |c1 |[a5, a6, a7, a8]|
|a1 |c2 |[a1, a2, a3, a4]|
|a3 |c2 |[a3, a4, a5, a6]|
|a5 |c2 |[a5, a6, a7, a8]|
+-----+--------+----------------+
这个想法是通过与用于该window_data
字段的窗口相同的窗口计算每一行的位置,并且只保留奇数行。此外,我们将删除在 window_data 中找不到 4 个元素的行(而不是添加空值)。
编辑:这是查询计划。它显示单个窗口用于rownum
列window_data
计算(没有额外的随机/排序)。
== Physical Plan ==
*(2) Project [value#5, category#6, window_data#10]
+- *(2) Filter ((isnotnull(rownum#15) && (pmod(rownum#15, 2) = 1)) && (size(window_data#10) = 4))
+- Window [collect_list(value#5, 0, 0) windowspecdefinition(category#6, value#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 3)) AS window_data#10, row_number() windowspecdefinition(category#6, value#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rownum#15], [category#6], [value#5 ASC NULLS FIRST]
+- *(1) Sort [category#6 ASC NULLS FIRST, value#5 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(category#6, 200)
+- LocalTableScan [value#5, category#6]
推荐阅读
- c# - 如何检查验证器是否在页面上验证和验证失败
- sustainsys-saml2 - Sustainsys.SAML2 使用 http-redirect 而不是 http-post
- c++ - CMake 外部项目 QT
- apache-spark - pyspark df.write.json('s3e://somepath') 是二进制的
- javascript - jquery:如何使用 onchange 更新输入值?
- python - 如何从 psql 查询返回中删除部分数字?
- java - DynamicJasper - 如何将子报表添加为列?
- c# - WPF Combobox 选择焦点项目
- ios - CNError.Code.communicationError 错误是什么意思?
- string - 如何在 APL 中连接字母和数字的组合?