apache-spark - 如何在pyspark中的多个逻辑偏移窗口中重用相同的分区?
问题描述
我有一个df
具有以下架构的数据框(Spark 2.4)
root
|-- segId: string (nullable = true)
|-- time: timestamp (nullable = true)
|-- val1: double (nullable = true)
在哪里
segId
是一个段(将其视为唯一标识符)time
是进行某些测量时的时间戳val1
是测量值
我需要计算val1
超过几个rangeBetween
s 的平均值(或一些自定义聚合)。例如,我想为每个段计算过去 1 分钟、2 分钟、...、100 分钟的平均值。
我不想创建一个 100 个窗口(它被分区和排序 100 次)。我想创建一个物理窗口(按一次分区segId
和按time
一次排序),然后rangeBetween
在最后 n 分钟内使用 a (对先前分区集的逻辑偏移)。
仅计算最后 1、2 和 3 分钟的代码示例:
win_physical = Window.partitionBy("segId").orderBy(F.col("time").cast("long"))
df = (
df.repartition("segId")
.orderBy(F.col("time").cast("long"))
.withColumn("mean1Mins", F.mean("val1").over(win_physical.rangeBetween( -(60-1), 0)))
.withColumn(
"mean2Mins", F.mean("val2").over(win_physical.rangeBetween(-(2*60-1), 0))
)
.withColumn(
"mean3Mins", F.mean("val1").over(win_physical.rangeBetween(-(3*60-1), 0))
)
.show()
)
物理平面图显示为上述示例使用三个窗口
== Physical Plan ==
CollectLimit 21
+- *(6) Project [segId#0, cast(time#8 as string) AS time#102, cast(val1#2 as string) AS val1#97, cast(val2#3L as string) AS val2#98, cast(mean1Mins#63 as string) AS mean1Mins#99, cast(mean2Mins#71 as string) AS mean2Mins#100, cast(mean3Mins#80 as string) AS mean3Mins#101]
+- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#81L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -179, currentrow$())) AS mean3Mins#80], [segId#0], [_w0#81L ASC NULLS FIRST]
+- *(5) Sort [segId#0 ASC NULLS FIRST, _w0#81L ASC NULLS FIRST], false, 0
+- *(5) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, mean2Mins#71, cast(time#8 as bigint) AS _w0#81L]
+- Window [avg(val2#3L) windowspecdefinition(segId#0, _w0#72L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -119, currentrow$())) AS mean2Mins#71], [segId#0], [_w0#72L ASC NULLS FIRST]
+- *(4) Sort [segId#0 ASC NULLS FIRST, _w0#72L ASC NULLS FIRST], false, 0
+- *(4) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, cast(time#8 as bigint) AS _w0#72L]
+- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#64L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -59, currentrow$())) AS mean1Mins#63], [segId#0], [_w0#64L ASC NULLS FIRST]
+- *(3) Sort [segId#0 ASC NULLS FIRST, _w0#64L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(segId#0, 1000)
+- *(2) Project [segId#0, time#8, val1#2, val2#3L, cast(time#8 as bigint) AS _w0#64L]
+- *(2) Sort [cast(time#8 as bigint) ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(cast(time#8 as bigint) ASC NULLS FIRST, 1000)
+- Exchange hashpartitioning(segId#0, 1000)
+- *(1) Project [segId#0, cast(time#1 as timestamp) AS time#8, val1#2, val2#3L]
+- Scan ExistingRDD[segId#0,time#1,val1#2,val2#3L]
我的问题是:
- spark 会为多个逻辑窗口重用相同的物理分区(即,一个分区并排序一次)吗?还是会为每个
rangeBetween
(计算密集型)创建单独的分区和排序? - 有什么建议可以在计算上改进上述逻辑以
rangeBetween
在同一分区上不同的自定义聚合?
解决方案
spark 会为多个逻辑窗口重用相同的物理分区(即,一个分区并排序一次)吗?还是会为每个 rangeBetween (计算密集型)创建单独的分区和排序?
也这样觉得。只有Exchange
操作员可以重新分区数据(靠近数据源Scan ExistingRDD
)。
有什么建议可以在计算上改进上述逻辑以在同一分区上的不同 rangeBetween 上进行自定义聚合?
这里不知道。对不起。
推荐阅读
- python - 如何在 cmd 上运行 Python 程序?
- node.js - 使用 fs.readFileSync 读取视频文件在 node.js 中不起作用
- python - 如何访问一个值 - Python
- javascript - 在 React 中将条件样式应用于组件 - 内联 CSS
- java - Hibernate JPA Oracle12C-主键(序列)上的约束违规异常
- ms-access-2016 - 错误您尝试打开的数据库需要更新版本的 Microsoft Access
- java - Spring boot如何实现Error页面
- gremlin - 如何排除不包含值的顶点
- c++ - 如何为给定的测试用例解决这个 geeksforgeeks 问题?
- javascript - Highchart 根据 y 轴的值改变十字准线颜色