首页 > 解决方案 > 如何用 0 填充 null 并使用 spark `pivot` 累积`count`?

问题描述

我有一些产品销售数据,如下所示:

产品 日期
一个 2020-01 60
2020-03 80
一个 2020-05 41
2020-08 50
2020-12 76
一个 2020-11 76

我想按以下方式对数据进行分组date和透视product 我的代码如下

     df.groupBy("date").pivot("product").agg(
      sum("money").as("month-sum"),
      sum(sum("money")).over(Window.orderBy("date").partitionBy("product")).as("cur-cumulative")
    ).orderBy("date").show()

结果是

|   date|A_month-sum|A_cur-cumulative|B_month-sum|B_cur-cumulative|
+-------+-----------+----------------+-----------+----------------+
|2020-01|         60|              60|       null|            null|
|2020-03|       null|            null|         80|             140|
|2020-05|         41|             181|       null|            null|
|2020-08|       null|            null|         50|             231|
|2020-11|         76|             307|       null|            null|
|2020-12|       null|            null|         76|             383|

我的期望是nullofmonth-sum可以用 0 填充,nullofcur-cumulative可以用最后一行的值填充,就像这样:

|   date|A_month-sum|A_cur-cumulative|B_month-sum|B_cur-cumulative|
+-------+-----------+----------------+-----------+----------------+
|2020-01|         60|              60|          0|               0|
|2020-03|          0|              60|         80|              80|
|2020-05|         41|             101|          0|              80|
|2020-08|          0|             101|         50|             130|
|2020-11|         76|             177|          0|             130|
|2020-12|          0|             177|         76|             206|
+-------+-----------+----------------+-----------+----------------+

有什么建议吗?提前致谢!

标签: sqlscalaapache-sparkapache-spark-sqlpivot

解决方案


你可以.na.fill(0)在做累计和之前做:

import org.apache.spark.sql.expressions.Window

val df2 = df
    .groupBy("date")
    .pivot("product")
    .agg(sum("money"))

val df3 = df2
    .na.fill(0)
    .select(
        col("date") +: 
        df2.columns.tail.flatMap(x => 
            Seq(
                col(x).as(x + "_month-sum"),
                sum(x).over(Window.orderBy("date")).as(x + "_cur-cumulative")
            )
        ): _*
    )
    .orderBy("date")

df3.show
+-------+-----------+----------------+-----------+----------------+
|   date|A_month-sum|A_cur-cumulative|B_month-sum|B_cur-cumulative|
+-------+-----------+----------------+-----------+----------------+
|2020-01|       60.0|            60.0|        0.0|             0.0|
|2020-03|        0.0|            60.0|       80.0|            80.0|
|2020-05|       41.0|           101.0|        0.0|            80.0|
|2020-08|        0.0|           101.0|       50.0|           130.0|
|2020-11|       76.0|           177.0|        0.0|           130.0|
|2020-12|        0.0|           177.0|       76.0|           206.0|
+-------+-----------+----------------+-----------+----------------+

推荐阅读