首页 > 解决方案 > 在我的窗口中满足某个条件之前,我如何窗口化?

问题描述

我有一组包含 3 列感兴趣的数据。第一列是代表月份的日期。第二个是包含该月一些起始金额的列。第三个是表示该月金额减少的列。在一段时间内,我每个月都有多行数据。

例如,我们可能会得到一个日期为 2020-01-01,起始金额为 5MM,减少金额为 2MM。这意味着我们预计在月底会有 3MM 的剩余金额。

我需要计算在接下来的几个月中消耗这个起始数量需要多长时间。

给定上面的例子,如果我们从 5MM 开始,那个月消耗了 2MM,我们还剩下 3MM。如果下个月 2020-02-01 消耗 1.5MM,我们还剩下 1.5MM。如果下个月 2020-03-01 消耗了 2MM,我们还剩下 -0.5MM,我们在 2020-03-01 月份完成了消耗量。2020-03-01 的这个结果是我希望得到的。

我怎样才能获得这个值?

我想在 DataFrame 的每一行中获得一个结果,并且我需要对 DataFrame 的其余部分执行聚合以查看历史行。因此,我假设我需要使用 Window 来计算这个值。但是,我无法弄清楚如何正确获取实际的 Window 设置。

我在 Window 中的函数是采用称为“opening_amount”的起始量并在 Window 上减去称为“consume_amount”的燃尽量。例如,

  def followingWindowSpec: WindowSpec = Window.partitionBy(
    partitionCols : _*
  )
    .orderBy(orderByCols: _*)
    .rangeBetween(0, Window.unboundedFollowing)

  val burndownCompleteDateExpr = min(
    when(
      col("opening_amount")
        - sum(col("consume_amount"))
        .over(followingWindowSpec)
        <= lit(0),
      col("fiscal_dt")
    )
  )
    .over(followingWindowSpec)

我相信我需要使用一个从当前行开始并向前看的窗口。我尝试过使用 Window(0, x) ,其中 x 是某个值。

当我将两个 Windows 都设置为 UnboundedFollowing 时,我会得到每个月的财务报表。

当我将 consume_amount 总和窗口设置为使用 UnboundedPreceding 时,我得到了第一个月正确结果的正确结果,因为它具有 NULL 前面的值,但接下来的几个月要么返回同一个月(前几个月),要么返回他们自己的月份月(最初几个月之后)。

如果您能给我有关如何正确进行窗口化的指示,或者如果我在错误的树上吠叫正确的方法是什么,我将不胜感激。

样本数据:

+----+----------+-------+--------+-------------+
|item|      date|opening|consumed|out_of_supply|
+----+----------+-------+--------+-------------+
| 101|2020-01-01|   3200|    2000|   2020-02-01|
| 101|2020-02-01|   4600|    1500|         null|
| 101|2020-03-01|   1500|    1300|   2020-04-01|
| 101|2020-04-01|   4000|     500|         null|
| 220|2020-01-01|   3400|    2000|   2020-02-01|
| 220|2020-02-01|   1600|    3000|   2020-02-01|
| 220|2020-03-01|    310|    1000|   2020-03-01|
| 220|2020-04-01|    680|     500|         null|
+----+----------+-------+--------+-------------+

对于每一行,我将行中的消耗值汇总到 n 行,一次增加 n 个,以查看期初值何时被完全消耗。

比如2020-01-01的101项,开3200,第一个月消耗2000,月底1200。这1200被2020-02-01的1500完全消耗,所以2020- 02-01 是我正在寻找的月份。

对于 2020-02-01 中的第 101 项,它永远不会被完全消耗,因此我将默认返回 null。

标签: scalaapache-spark

解决方案


我建议使用Window/rowsBetween()为每一行组装consumed以下行中的数据列表,然后由 UDF 处理以捕获供应不足date

val df = Seq(
  (101, "2020-01-01", 3200, 2000),
  (101, "2020-02-01", 4600, 1500),
  (101, "2020-03-01", 1500, 1300),
  (101, "2020-04-01", 4000,  500),
  (220, "2020-01-01", 3400, 2000),
  (220, "2020-02-01", 1600, 3000),
  (220, "2020-03-01",  310, 1000),
  (220, "2020-04-01",  680,  500)
).toDF("item", "date", "opening", "consumed")

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.Window
val win1 = Window.partitionBy("item").orderBy("date").
  rowsBetween(0, Window.unboundedFollowing)

val outOfStockDate = udf { (opening: Int, list: Seq[Row]) =>
  @scala.annotation.tailrec
  def loop(ls: List[(String, Int)], dt: String, acc: Int): String = ls match {
    case Nil =>
      null
    case head :: tail =>
      val accNew = acc + head._2
      if (opening <= accNew) head._1 else loop(tail, head._1, accNew)
  }
  loop(list.map{ case Row(d: String, c: Int) => (d, c) }.toList, null, 0)
}

df.
  withColumn("consumed_list", collect_list(struct($"date", $"consumed")).over(win1)).
  withColumn("out_of_supply", outOfStockDate($"opening", $"consumed_list")).
  drop("consumed_list").
  show
// +----+----------+-------+--------+-------------+
// |item|      date|opening|consumed|out_of_supply|
// +----+----------+-------+--------+-------------+
// | 101|2020-01-01|   3200|    2000|   2020-02-01|
// | 101|2020-02-01|   4600|    1500|         null|
// | 101|2020-03-01|   1500|    1300|   2020-04-01|
// | 101|2020-04-01|   4000|     500|         null|
// | 220|2020-01-01|   3400|    2000|   2020-02-01|
// | 220|2020-02-01|   1600|    3000|   2020-02-01|
// | 220|2020-03-01|    310|    1000|   2020-03-01|
// | 220|2020-04-01|    680|     500|         null|
// +----+----------+-------+--------+-------------+

推荐阅读