首页 > 解决方案 > 使用 Spark SQL joinWith,如何连接两个数据集以根据日期将当前记录与其以前的记录相匹配?

问题描述

我正在尝试使用 joinWith 在 Spark SQL 中连接两个仪表读数数据集,以便返回的类型为 Dataset [(Reading, Reading)]。目标是根据日期列将第一个数据集中的每一行(称为 Current)与第二个数据集中的先前记录(称为 Previous)相匹配。

我需要先加入仪表键,然后通过比较日期加入,找到小于当前读数日期(即上一次读数)的下一个最大日期。

这是我尝试过的,但我认为这太微不足道了。我也收到 MAX 的“无法解决”错误。

val joined = Current.joinWith(
      Previous,
      (Current("Meter_Key") === Previous("Meter_Key"))
        && (Current("Reading_Dt_Key") > MAX(Previous("Reading_Dt_Key"))
    )

任何人都可以帮忙吗?

标签: scalaapache-spark-sqldataset

解决方案


没有尝试使用 LAG,认为这也可以。但是用 joinWith 查看了您的要求,并出于性能原因决定应用一些逻辑。作业中的许多步骤被跳过。使用不同的名称,您可以抽象、重命名和删除 cols。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

case class mtr0(mtr: String, seqNum: Int)
case class mtr(mtr: String, seqNum: Int, rank: Int)

// Gen data & optimize for JOINing, just interested in max 2 records for ranked sets.
val curr0 = Seq(
mtr0("m1", 1),
mtr0("m1", 2),
mtr0("m1", 3),
mtr0("m2", 7)
).toDS

val curr1 = curr0.withColumn("rank", row_number()
                 .over(Window.partitionBy($"mtr").orderBy($"seqNum".desc)))

// Reduce before JOIN.
val currF=curr1.filter($"rank" === 1 ).as[mtr]
//currF.show(false) 
val prevF=curr1.filter($"rank" === 2 ).as[mtr]
//prevF.show(false) 

val selfDF = currF.as("curr").joinWith(prevF.as("prev"),
( col("curr.mtr") === col("prev.mtr") && (col("curr.rank") === 1) && (col("prev.rank") === 2)),"left")

// Null value evident when only 1 entry per meter.
selfDF.show(false)

返回:

+----------+----------+
|_1        |_2        |
+----------+----------+
|[m1, 3, 1]|[m1, 2, 2]|
|[m2, 7, 1]|null      |
+----------+----------+

selfDF: org.apache.spark.sql.Dataset[(mtr, mtr)] = [_1: struct<mtr: string, seqNum: int ... 1 more field>, _2: struct<mtr: string, seqNum: int ... 1 more field>]

推荐阅读