scala - Spark - 拖动前一天的值
问题描述
我正在尝试更新DataFrame 中itemPrice
列的每日值。dailyRecords
下面是该特定列的架构。
|-- dailyRecords: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dayId: integer (nullable = true)
| | |-- itemPrice: double (nullable = true)
| | |-- itemsPurchased: integer (nullable = true)
| | |-- itemSku: string (nullable = true)
(请注意,可以有许多其他列)。我们可以假设 dayId 可以是一个从 1 递增到 365 的序列。
因此,对于每个itemSku
when theitemsPurchased is greater than 0
然后从itemPrice is equal to 0
_itemPrice
previous dayId
dayId is 10
itemPrice from dayId 9
itemPrice
dayId
任何导致所需解决方案或可能的方法的线索将不胜感激
谢谢!
在数学考虑评论后,当多列更有效地更新时,另一个替代解决方案:
上面的模式dailyRecords
是通过组合来自的多个列获得的another dataframe
。因此,考虑到评论,最好更新它itemPrice
何时位于单独的数据框中,而不是在将列嵌套在一列中之后进行更新dailyRecords
。
所以这是我itemPrice
使用foldLeft
.
https://stackoverflow.com/a/62307771/12322995
请注意,我使用的这个解决方案foldLeft
是因为我有更多的列要更新,而不仅仅是itemPrice
问题之外。
解决方案
struct
在执行and之前,最好在 anotherDF 本身中生成正确的 itemPrice,collect_list
如下所示:
scala> val anotherDF = List(
| (1,10.11,5,"item1"),(2,15.45,3,"item1"),(3,0.0,3,"item1"),(4,17.50,4,"item1"),
| (1,10.11,5,"item2"),(2,0.0,0,"item2"),(3,16.50,3,"item2"),(4,17.50,4,"item2"),
| (1,20.20,5,"item3"),(2,0.0,3,"item3"),(3,30.50,3,"item3"),(4,0.0,4,"item3"),(5,0.0,4,"item3")
| ).toDF("dayId","itemPrice","itemsPurchased","itemSku")
anotherDF: org.apache.spark.sql.DataFrame = [dayId: int, itemPrice: double ... 2 more fields]
scala> anotherDF.show
+-----+---------+--------------+-------+
|dayId|itemPrice|itemsPurchased|itemSku|
+-----+---------+--------------+-------+
| 1| 10.11| 5| item1|
| 2| 15.45| 3| item1|
| 3| 0.0| 3| item1|
| 4| 17.5| 4| item1|
| 1| 10.11| 5| item2|
| 2| 0.0| 0| item2|
| 3| 16.5| 3| item2|
| 4| 17.5| 4| item2|
| 1| 20.2| 5| item3|
| 2| 0.0| 3| item3|
| 3| 30.5| 3| item3|
| 4| 0.0| 4| item3|
| 5| 0.0| 4| item3|
+-----+---------+--------------+-------+
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> val ww = Window.partitionBy("itemSku").orderBy("dayId")
ww: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@4cb9f248
scala> anotherDF.withColumn("updatedPrice", when(col("itemPrice")===0 && col("itemsPurchased")>0, lag("itemPrice",1).over(ww)).otherwise(col("itemPrice"))).show
+-----+---------+--------------+-------+------------+
|dayId|itemPrice|itemsPurchased|itemSku|updatedPrice|
+-----+---------+--------------+-------+------------+
| 1| 20.2| 5| item3| 20.2|
| 2| 0.0| 3| item3| 20.2|
| 3| 30.5| 3| item3| 30.5|
| 4| 0.0| 4| item3| 30.5|
| 5| 0.0| 4| item3| 0.0|
| 1| 10.11| 5| item2| 10.11|
| 2| 0.0| 0| item2| 0.0|
| 3| 16.5| 3| item2| 16.5|
| 4| 17.5| 4| item2| 17.5|
| 1| 10.11| 5| item1| 10.11|
| 2| 15.45| 3| item1| 15.45|
| 3| 0.0| 3| item1| 15.45|
| 4| 17.5| 4| item1| 17.5|
+-----+---------+--------------+-------+------------+
然后使用updatedPrice
fromanotherDF
作为你的itemPrice
.
推荐阅读
- javascript - 如何检查 form.reset() 是否改变了任何东西
- excel - 循环遍历所有可用的 OLAP 多维数据集过滤器值
- xamarin - 如何在 xamarin 表单中查找系统异常,并且在 Xamarin.Forms (PCL) 中找不到类型或命名空间名称“SystemException”
- javascript - MongoJS 不按 ID 删除
- htmlspecialchars - 使用 htmlspecialchars 动态标题作为短代码的参数
- python - 在 Python 中,检查是否以 root 身份执行
- python - 熊猫 DataFrame.plot() 方法
- c++ - C++ 标准中是否有关于精度和范围的浮点要求
- c++ - 在c ++中将字符串words[]转换为double feature[]
- excel - Excel VBA - 插入新行后向下拖动公式和格式