首页 > 解决方案 > Spark Scala:加入 2 个表并提取最大日期的数据(请参阅说明)

问题描述

我想加入两个并为每个值tables A and B选择具有最大日期的记录。table B

考虑下表:

Table A:

+---+-----+----------+
| id|Value|start_date|
+---+---- +----------+
| 1 |   a | 1/1/2018 |
| 2 |   a | 4/1/2018 |
| 3 |   a | 8/1/2018 |
| 4 |   c | 1/1/2018 |
| 5 |   d | 1/1/2018 |
| 6 |   e | 1/1/2018 |
+---+-----+----------+

Table B:

+---+-----+----------+
|Key|Value|sent_date |
+---+---- +----------+
| x |   a | 2/1/2018 |
| y |   a | 7/1/2018 |
| z |   a | 11/1/2018|
| p |   c | 5/1/2018 |
| q |   d | 5/1/2018 |
| r |   e | 5/1/2018 |
+---+-----+----------+

目的是为中的每个值引入列idfrom 。同样,表和需要与列连接在一起,对于 中的每条记录,对于列中的每条数据,都有条件找到Table ATable BTable BABvalueBmax(A.start_date)ValueTable AA.start_date < B.sent_date

让我们考虑value=a这里。在table A,我们可以看到 3 条Value=a不同的 3 条记录start_date。因此,当加入Table B, for value=awith时sent_date=2/1/2018,记录小于的记录(在本例中为 2018 年 1 月 1 日),并将列中的相应数据拉到.max(start_date)start_datesent_date in Table BA.idTable B

同样对于带有value=asent_date = 11/1/2018in的记录,需要将Table B表中的 id=3A拉到table B.

结果必须如下:

+---+-----+----------+---+
|Key|Value|sent_date |id |
+---+---- +----------+---+
| x |   a | 2/1/2018 | 1 |
| y |   a | 7/1/2018 | 2 |
| z |   a | 11/1/2018| 3 |
| p |   c | 5/1/2018 | 4 |
| q |   d | 5/1/2018 | 5 |
| r |   e | 5/1/2018 | 6 |
+---+-----+----------+---+

我正在使用 Spark 2.3。我已经加入了这两个表(使用 Dataframe)并max(start_date)根据条件找到了。但我无法弄清楚如何在这里提取记录。

有谁可以帮我离开这里吗

提前致谢!!

标签: scalaapache-sparkhiveapache-spark-sql

解决方案


我刚刚将日期“2018 年 11 月 1 日”更改为“2018 年 9 月 1 日”,因为字符串排序给出了不正确的结果。当转换为日期时,逻辑仍然有效。见下文

scala> val df_a = Seq((1,"a","1/1/2018"),
     | (2,"a","4/1/2018"),
     | (3,"a","8/1/2018"),
     | (4,"c","1/1/2018"),
     | (5,"d","1/1/2018"),
     | (6,"e","1/1/2018")).toDF("id","value","start_date")
df_a: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]

scala> val df_b = Seq(("x","a","2/1/2018"),
     | ("y","a","7/1/2018"),
     | ("z","a","9/1/2018"),
     | ("p","c","5/1/2018"),
     | ("q","d","5/1/2018"),
     | ("r","e","5/1/2018")).toDF("key","valueb","sent_date")
df_b: org.apache.spark.sql.DataFrame = [key: string, valueb: string ... 1 more field]

scala>  val df_join = df_b.join(df_a,'valueb==='valuea,"inner")
df_join: org.apache.spark.sql.DataFrame = [key: string, valueb: string ... 4 more fields]

scala> df_join.filter('sent_date >= 'start_date).withColumn("rank", rank().over(Window.partitionBy('key,'valueb,'sent_date).orderBy('start_date.desc))).filter('rank===1).drop("valuea","start_date","rank").show()
+---+------+---------+---+
|key|valueb|sent_date| id|
+---+------+---------+---+
|  q|     d| 5/1/2018|  5|
|  p|     c| 5/1/2018|  4|
|  r|     e| 5/1/2018|  6|
|  x|     a| 2/1/2018|  1|
|  y|     a| 7/1/2018|  2|
|  z|     a| 9/1/2018|  3|
+---+------+---------+---+


scala>

更新

下面是处理 MM/dd/yyyy 格式的日期字符串的 udf

scala> def dateConv(x:String):String=
     | {
     | val y = x.split("/").map(_.toInt).map("%02d".format(_))
     | y(2)+"-"+y(0)+"-"+y(1)
     | }
dateConv: (x: String)String

scala>  val udfdateconv = udf( dateConv(_:String):String )
udfdateconv: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> val df_a_dt = df_a.withColumn("start_date",date_format(udfdateconv('start_date),"yyyy-MM-dd").cast("date"))
df_a_dt: org.apache.spark.sql.DataFrame = [id: int, valuea: string ... 1 more field]

scala> df_a_dt.printSchema
root
 |-- id: integer (nullable = false)
 |-- valuea: string (nullable = true)
 |-- start_date: date (nullable = true)


scala> df_a_dt.show()
+---+------+----------+
| id|valuea|start_date|
+---+------+----------+
|  1|     a|2018-01-01|
|  2|     a|2018-04-01|
|  3|     a|2018-08-01|
|  4|     c|2018-01-01|
|  5|     d|2018-01-01|
|  6|     e|2018-01-01|
+---+------+----------+


scala>

推荐阅读