scala - 火花窗函数中未计算的正确值
问题描述
我有下表 初始数据
我需要转换usign Spark scala窗口函数,如下 所示
使用的代码在这里。我无法获得所需的 AssignmentId 值
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val data = Seq(
Row("1","E19819","1","2019-11-03","A6"),
Row("2","E19819","1","2019-11-02","A4"),
Row("3","E19819","1","2019-11-01","A3"),
Row("4","E19819","0","2019-11-01","A1"),
Row("5","E19819","0","2019-11-01","A2"),
Row("6","E19819","1","2019-10-28","A5")
)
val schema = StructType(
List(
StructField("UniqueId", StringType, true),
StructField("CustomerId", StringType, true),
StructField("Backoffice", StringType, true),
StructField("TxDateBase", StringType, true),
StructField("AssignmtId_orig", StringType, true)
)
)
val dfCustData = spark.createDataFrame(
spark.sparkContext.parallelize(data),
schema
).
withColumn("TxDate", to_date(col("TxDateBase")))
.withColumn("AssignmtId", when ( col("Backoffice")===1, col("AssignmtId_orig")).otherwise(null))
.drop("TxDateBase")
val cols = Seq("AssignmtId")
val w1 = Window
.partitionBy("CustomerId")
.orderBy($"TxDate",$"Backoffice".desc,$"UniqueId".desc)
.rangeBetween(Window.unboundedPreceding, Window.currentRow)
val w2 = Window
.partitionBy("CustomerId")
.orderBy($"TxDate",$"Backoffice".desc,$"UniqueId".desc)
.rangeBetween(Window.currentRow, Window.unboundedFollowing)
val dfCustTransformedData = cols .foldLeft(dfCustData)((dfCustupdated, columnName) =>
dfCustupdated.withColumn(columnName,
coalesce(col(columnName),
first(columnName, ignoreNulls = true).over(w2),
last(columnName, ignoreNulls = true).over(w1)
))
)
dfCustTransformedData.orderBy($"TxDate".desc,$"Backoffice".desc, $"uniqueId").show()
对于 uniqueid 4&5 的行,AssignmentId 应设置为 A3。现在它设置为A4
解决方案
推荐阅读
- javascript - 如何使用firebase函数从父文件夹中读取文件
- go - 如何为 kafka 消费者添加自定义消息反序列化器(使用 sarama lib)
- echarts - 为 echarts 设置全局千位分隔符
- ios - APNS 无法获取 DeviceToken
- url-rewriting - URL 重写规则以删除查询字符串后的斜杠
- html - 为什么我的 CSS flexbox 不会影响我的 div,即使它适用于其他部分
- python - 如何(快速)使用用于比较的函数迭代两个数据帧(熊猫)
- vue.js - 如何同时使用两种不同类型的存储
- elasticsearch - 为什么我的解析器类型没有出现在我的弹性索引中?
- arraylist - 无法启动活动 ComponentInfo - 由编辑 ArrayList 中的对象引起