首页 > 解决方案 > Spark - 拖动前一天的值

问题描述

我正在尝试更新DataFrame 中itemPrice列的每日值。dailyRecords下面是该特定列的架构。

|-- dailyRecords: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dayId: integer (nullable = true)
 |    |    |-- itemPrice: double (nullable = true)
 |    |    |-- itemsPurchased: integer (nullable = true)
 |    |    |-- itemSku: string (nullable = true)

(请注意,可以有许多其他列)。我们可以假设 dayId 可以是一个从 1 递增到 365 的序列。

因此,对于每个itemSkuwhen theitemsPurchased is greater than 0然后从itemPrice is equal to 0_itemPriceprevious dayIddayId is 10itemPrice from dayId 9itemPricedayId

任何导致所需解决方案或可能的方法的线索将不胜感激

谢谢!

在数学考虑评论后,当多列更有效地更新时,另一个替代解决方案:

上面的模式dailyRecords是通过组合来自的多个列获得的another dataframe。因此,考虑到评论,最好更新它itemPrice何时位于单独的数据框中,而不是在将列嵌套在一列中之后进行更新dailyRecords

所以这是我itemPrice使用foldLeft.

https://stackoverflow.com/a/62307771/12322995

请注意,我使用的这个解决方案foldLeft是因为我有更多的列要更新,而不仅仅是itemPrice问题之外。

标签: scaladataframeapache-spark

解决方案


struct在执行and之前,最好在 anotherDF 本身中生成正确的 itemPrice,collect_list如下所示:

scala> val anotherDF = List(
     | (1,10.11,5,"item1"),(2,15.45,3,"item1"),(3,0.0,3,"item1"),(4,17.50,4,"item1"),
     | (1,10.11,5,"item2"),(2,0.0,0,"item2"),(3,16.50,3,"item2"),(4,17.50,4,"item2"),
     | (1,20.20,5,"item3"),(2,0.0,3,"item3"),(3,30.50,3,"item3"),(4,0.0,4,"item3"),(5,0.0,4,"item3")
     | ).toDF("dayId","itemPrice","itemsPurchased","itemSku")
anotherDF: org.apache.spark.sql.DataFrame = [dayId: int, itemPrice: double ... 2 more fields]

scala> anotherDF.show
+-----+---------+--------------+-------+
|dayId|itemPrice|itemsPurchased|itemSku|
+-----+---------+--------------+-------+
|    1|    10.11|             5|  item1|
|    2|    15.45|             3|  item1|
|    3|      0.0|             3|  item1|
|    4|     17.5|             4|  item1|
|    1|    10.11|             5|  item2|
|    2|      0.0|             0|  item2|
|    3|     16.5|             3|  item2|
|    4|     17.5|             4|  item2|
|    1|     20.2|             5|  item3|
|    2|      0.0|             3|  item3|
|    3|     30.5|             3|  item3|
|    4|      0.0|             4|  item3|
|    5|      0.0|             4|  item3|
+-----+---------+--------------+-------+


scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val ww = Window.partitionBy("itemSku").orderBy("dayId")
ww: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@4cb9f248

scala> anotherDF.withColumn("updatedPrice", when(col("itemPrice")===0 && col("itemsPurchased")>0, lag("itemPrice",1).over(ww)).otherwise(col("itemPrice"))).show
+-----+---------+--------------+-------+------------+
|dayId|itemPrice|itemsPurchased|itemSku|updatedPrice|
+-----+---------+--------------+-------+------------+
|    1|     20.2|             5|  item3|        20.2|
|    2|      0.0|             3|  item3|        20.2|
|    3|     30.5|             3|  item3|        30.5|
|    4|      0.0|             4|  item3|        30.5|
|    5|      0.0|             4|  item3|         0.0|
|    1|    10.11|             5|  item2|       10.11|
|    2|      0.0|             0|  item2|         0.0|
|    3|     16.5|             3|  item2|        16.5|
|    4|     17.5|             4|  item2|        17.5|
|    1|    10.11|             5|  item1|       10.11|
|    2|    15.45|             3|  item1|       15.45|
|    3|      0.0|             3|  item1|       15.45|
|    4|     17.5|             4|  item1|        17.5|
+-----+---------+--------------+-------+------------+

然后使用updatedPricefromanotherDF作为你的itemPrice.


推荐阅读