首页 > 解决方案 > 如何在pyspark中的多个逻辑偏移窗口中重用相同的分区?

问题描述

我有一个df具有以下架构的数据框(Spark 2.4)

root
 |-- segId: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- val1: double (nullable = true)

在哪里

我需要计算val1超过几个rangeBetweens 的平均值(或一些自定义聚合)。例如,我想为每个段计算过去 1 分钟、2 分钟、...、100 分钟的平均值。

我不想创建一个 100 个窗口(它被分区和排序 100 次)。我想创建一个物理窗口(按一次分区segId和按time一次排序),然后rangeBetween在最后 n 分钟内使用 a (对先前分区集的逻辑偏移)。

仅计算最后 1、2 和 3 分钟的代码示例:

win_physical = Window.partitionBy("segId").orderBy(F.col("time").cast("long"))

df = (
    df.repartition("segId")
    .orderBy(F.col("time").cast("long"))
    .withColumn("mean1Mins", F.mean("val1").over(win_physical.rangeBetween( -(60-1), 0)))
    .withColumn(
        "mean2Mins", F.mean("val2").over(win_physical.rangeBetween(-(2*60-1), 0))
    )
    .withColumn(
        "mean3Mins", F.mean("val1").over(win_physical.rangeBetween(-(3*60-1), 0))
    )
    .show()
)

物理平面图显示为上述示例使用三个窗口

== Physical Plan ==
CollectLimit 21
+- *(6) Project [segId#0, cast(time#8 as string) AS time#102, cast(val1#2 as string) AS val1#97, cast(val2#3L as string) AS val2#98, cast(mean1Mins#63 as string) AS mean1Mins#99, cast(mean2Mins#71 as string) AS mean2Mins#100, cast(mean3Mins#80 as string) AS mean3Mins#101]
   +- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#81L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -179, currentrow$())) AS mean3Mins#80], [segId#0], [_w0#81L ASC NULLS FIRST]
      +- *(5) Sort [segId#0 ASC NULLS FIRST, _w0#81L ASC NULLS FIRST], false, 0
         +- *(5) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, mean2Mins#71, cast(time#8 as bigint) AS _w0#81L]
            +- Window [avg(val2#3L) windowspecdefinition(segId#0, _w0#72L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -119, currentrow$())) AS mean2Mins#71], [segId#0], [_w0#72L ASC NULLS FIRST]
               +- *(4) Sort [segId#0 ASC NULLS FIRST, _w0#72L ASC NULLS FIRST], false, 0
                  +- *(4) Project [segId#0, time#8, val1#2, val2#3L, mean1Mins#63, cast(time#8 as bigint) AS _w0#72L]
                     +- Window [avg(val1#2) windowspecdefinition(segId#0, _w0#64L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -59, currentrow$())) AS mean1Mins#63], [segId#0], [_w0#64L ASC NULLS FIRST]
                        +- *(3) Sort [segId#0 ASC NULLS FIRST, _w0#64L ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(segId#0, 1000)
                              +- *(2) Project [segId#0, time#8, val1#2, val2#3L, cast(time#8 as bigint) AS _w0#64L]
                                 +- *(2) Sort [cast(time#8 as bigint) ASC NULLS FIRST], true, 0
                                    +- Exchange rangepartitioning(cast(time#8 as bigint) ASC NULLS FIRST, 1000)
                                       +- Exchange hashpartitioning(segId#0, 1000)
                                          +- *(1) Project [segId#0, cast(time#1 as timestamp) AS time#8, val1#2, val2#3L]
                                             +- Scan ExistingRDD[segId#0,time#1,val1#2,val2#3L]

我的问题是:

  1. spark 会为多个逻辑窗口重用相同的物理分区(即,一个分区并排序一次)吗?还是会为每个rangeBetween(计算密集型)创建单独的分区和排序?
  2. 有什么建议可以在计算上改进上述逻辑以rangeBetween在同一分区上不同的自定义聚合?

标签: apache-sparkpysparkapache-spark-sql

解决方案


spark 会为多个逻辑窗口重用相同的物理分区(即,一个分区并排序一次)吗?还是会为每个 rangeBetween (计算密集型)创建单独的分区和排序?

也这样觉得。只有Exchange操作员可以重新分区数据(靠近数据源Scan ExistingRDD)。

有什么建议可以在计算上改进上述逻辑以在同一分区上的不同 rangeBetween 上进行自定义聚合?

这里不知道。对不起。


推荐阅读