scala - 在我的窗口中满足某个条件之前,我如何窗口化?
问题描述
我有一组包含 3 列感兴趣的数据。第一列是代表月份的日期。第二个是包含该月一些起始金额的列。第三个是表示该月金额减少的列。在一段时间内,我每个月都有多行数据。
例如,我们可能会得到一个日期为 2020-01-01,起始金额为 5MM,减少金额为 2MM。这意味着我们预计在月底会有 3MM 的剩余金额。
我需要计算在接下来的几个月中消耗这个起始数量需要多长时间。
给定上面的例子,如果我们从 5MM 开始,那个月消耗了 2MM,我们还剩下 3MM。如果下个月 2020-02-01 消耗 1.5MM,我们还剩下 1.5MM。如果下个月 2020-03-01 消耗了 2MM,我们还剩下 -0.5MM,我们在 2020-03-01 月份完成了消耗量。2020-03-01 的这个结果是我希望得到的。
我怎样才能获得这个值?
我想在 DataFrame 的每一行中获得一个结果,并且我需要对 DataFrame 的其余部分执行聚合以查看历史行。因此,我假设我需要使用 Window 来计算这个值。但是,我无法弄清楚如何正确获取实际的 Window 设置。
我在 Window 中的函数是采用称为“opening_amount”的起始量并在 Window 上减去称为“consume_amount”的燃尽量。例如,
def followingWindowSpec: WindowSpec = Window.partitionBy(
partitionCols : _*
)
.orderBy(orderByCols: _*)
.rangeBetween(0, Window.unboundedFollowing)
val burndownCompleteDateExpr = min(
when(
col("opening_amount")
- sum(col("consume_amount"))
.over(followingWindowSpec)
<= lit(0),
col("fiscal_dt")
)
)
.over(followingWindowSpec)
我相信我需要使用一个从当前行开始并向前看的窗口。我尝试过使用 Window(0, x) ,其中 x 是某个值。
当我将两个 Windows 都设置为 UnboundedFollowing 时,我会得到每个月的财务报表。
当我将 consume_amount 总和窗口设置为使用 UnboundedPreceding 时,我得到了第一个月正确结果的正确结果,因为它具有 NULL 前面的值,但接下来的几个月要么返回同一个月(前几个月),要么返回他们自己的月份月(最初几个月之后)。
如果您能给我有关如何正确进行窗口化的指示,或者如果我在错误的树上吠叫正确的方法是什么,我将不胜感激。
样本数据:
+----+----------+-------+--------+-------------+
|item| date|opening|consumed|out_of_supply|
+----+----------+-------+--------+-------------+
| 101|2020-01-01| 3200| 2000| 2020-02-01|
| 101|2020-02-01| 4600| 1500| null|
| 101|2020-03-01| 1500| 1300| 2020-04-01|
| 101|2020-04-01| 4000| 500| null|
| 220|2020-01-01| 3400| 2000| 2020-02-01|
| 220|2020-02-01| 1600| 3000| 2020-02-01|
| 220|2020-03-01| 310| 1000| 2020-03-01|
| 220|2020-04-01| 680| 500| null|
+----+----------+-------+--------+-------------+
对于每一行,我将行中的消耗值汇总到 n 行,一次增加 n 个,以查看期初值何时被完全消耗。
比如2020-01-01的101项,开3200,第一个月消耗2000,月底1200。这1200被2020-02-01的1500完全消耗,所以2020- 02-01 是我正在寻找的月份。
对于 2020-02-01 中的第 101 项,它永远不会被完全消耗,因此我将默认返回 null。
解决方案
我建议使用Window/rowsBetween()
为每一行组装consumed
以下行中的数据列表,然后由 UDF 处理以捕获供应不足date
:
val df = Seq(
(101, "2020-01-01", 3200, 2000),
(101, "2020-02-01", 4600, 1500),
(101, "2020-03-01", 1500, 1300),
(101, "2020-04-01", 4000, 500),
(220, "2020-01-01", 3400, 2000),
(220, "2020-02-01", 1600, 3000),
(220, "2020-03-01", 310, 1000),
(220, "2020-04-01", 680, 500)
).toDF("item", "date", "opening", "consumed")
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.Window
val win1 = Window.partitionBy("item").orderBy("date").
rowsBetween(0, Window.unboundedFollowing)
val outOfStockDate = udf { (opening: Int, list: Seq[Row]) =>
@scala.annotation.tailrec
def loop(ls: List[(String, Int)], dt: String, acc: Int): String = ls match {
case Nil =>
null
case head :: tail =>
val accNew = acc + head._2
if (opening <= accNew) head._1 else loop(tail, head._1, accNew)
}
loop(list.map{ case Row(d: String, c: Int) => (d, c) }.toList, null, 0)
}
df.
withColumn("consumed_list", collect_list(struct($"date", $"consumed")).over(win1)).
withColumn("out_of_supply", outOfStockDate($"opening", $"consumed_list")).
drop("consumed_list").
show
// +----+----------+-------+--------+-------------+
// |item| date|opening|consumed|out_of_supply|
// +----+----------+-------+--------+-------------+
// | 101|2020-01-01| 3200| 2000| 2020-02-01|
// | 101|2020-02-01| 4600| 1500| null|
// | 101|2020-03-01| 1500| 1300| 2020-04-01|
// | 101|2020-04-01| 4000| 500| null|
// | 220|2020-01-01| 3400| 2000| 2020-02-01|
// | 220|2020-02-01| 1600| 3000| 2020-02-01|
// | 220|2020-03-01| 310| 1000| 2020-03-01|
// | 220|2020-04-01| 680| 500| null|
// +----+----------+-------+--------+-------------+
推荐阅读
- r - 使用 apache 箭头在一个 R 数据帧中读取分区 parquet 目录(所有文件)
- javascript - ASP.Net Core 3 和简单的 POST 控制器
- javascript - 如何测试 google.maps.Geocoder?
- kubernetes - 是否可以/建议关闭 EKS 上的 NodeRestriction 插件?
- c# - SQL Server 存储过程抱怨缺少参数,但正在设置
- groovy - 如何将任意参数传递给包装的 groovy 函数?
- reactjs - 在带有组件导入的 .less 文件的反应项目中使用 LESS 变量
- vb.net - 如何使该程序作为 OOP 程序工作,以便我可以在代码中的任何地方使用我的数组?
- android - 应用未显示在 Google Play 商店搜索结果中
- javascript - 如何划分 XML 元素列表并根据节点的日期用 javscript 解析它?