首页 > 解决方案 > 火花窗函数中未计算的正确值

问题描述

我有下表 初始数据

我需要转换usign Spark scala窗口函数,如下 所示

使用的代码在这里。我无法获得所需的 AssignmentId 值

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val data = Seq(
    Row("1","E19819","1","2019-11-03","A6"),
    Row("2","E19819","1","2019-11-02","A4"),
    Row("3","E19819","1","2019-11-01","A3"),
    Row("4","E19819","0","2019-11-01","A1"),
    Row("5","E19819","0","2019-11-01","A2"),
    Row("6","E19819","1","2019-10-28","A5")



)
val schema = StructType(
  List(
    StructField("UniqueId", StringType, true),
    StructField("CustomerId", StringType, true),
    StructField("Backoffice", StringType, true),
    StructField("TxDateBase", StringType, true),
    StructField("AssignmtId_orig", StringType, true)
  )
)

val dfCustData = spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  schema
).
  withColumn("TxDate", to_date(col("TxDateBase")))
 .withColumn("AssignmtId", when ( col("Backoffice")===1, col("AssignmtId_orig")).otherwise(null))
 .drop("TxDateBase")

val cols = Seq("AssignmtId")
val w1 = Window
  .partitionBy("CustomerId")
  .orderBy($"TxDate",$"Backoffice".desc,$"UniqueId".desc)
  .rangeBetween(Window.unboundedPreceding, Window.currentRow)

val w2 = Window
  .partitionBy("CustomerId")
  .orderBy($"TxDate",$"Backoffice".desc,$"UniqueId".desc)
  .rangeBetween(Window.currentRow, Window.unboundedFollowing)

val dfCustTransformedData = cols .foldLeft(dfCustData)((dfCustupdated, columnName) =>
      dfCustupdated.withColumn(columnName,
    coalesce(col(columnName),
      first(columnName, ignoreNulls = true).over(w2),
      last(columnName, ignoreNulls = true).over(w1)
    ))
)
dfCustTransformedData.orderBy($"TxDate".desc,$"Backoffice".desc, $"uniqueId").show()

对于 uniqueid 4&5 的行,AssignmentId 应设置为 A3。现在它设置为A4

标签: scalaapache-spark

解决方案


推荐阅读