scala - Spark Scala:加入 2 个表并提取最大日期的数据(请参阅说明)
问题描述
我想加入两个并为每个值tables A and B
选择具有最大日期的记录。table B
考虑下表:
Table A:
+---+-----+----------+
| id|Value|start_date|
+---+---- +----------+
| 1 | a | 1/1/2018 |
| 2 | a | 4/1/2018 |
| 3 | a | 8/1/2018 |
| 4 | c | 1/1/2018 |
| 5 | d | 1/1/2018 |
| 6 | e | 1/1/2018 |
+---+-----+----------+
Table B:
+---+-----+----------+
|Key|Value|sent_date |
+---+---- +----------+
| x | a | 2/1/2018 |
| y | a | 7/1/2018 |
| z | a | 11/1/2018|
| p | c | 5/1/2018 |
| q | d | 5/1/2018 |
| r | e | 5/1/2018 |
+---+-----+----------+
目的是为中的每个值引入列id
from 。同样,表和需要与列连接在一起,对于 中的每条记录,对于列中的每条数据,都有条件找到Table A
Table B
Table B
A
B
value
B
max(A.start_date)
Value
Table A
A.start_date < B.sent_date
让我们考虑value=a
这里。在table A,
我们可以看到 3 条Value=a
不同的 3 条记录start_date
。因此,当加入Table B
, for value=a
with时sent_date=2/1/2018
,记录小于的记录(在本例中为 2018 年 1 月 1 日),并将列中的相应数据拉到.max(start_date)
start_date
sent_date in Table B
A.id
Table B
同样对于带有value=a
和sent_date = 11/1/2018
in的记录,需要将Table B
表中的 id=3A
拉到table B
.
结果必须如下:
+---+-----+----------+---+
|Key|Value|sent_date |id |
+---+---- +----------+---+
| x | a | 2/1/2018 | 1 |
| y | a | 7/1/2018 | 2 |
| z | a | 11/1/2018| 3 |
| p | c | 5/1/2018 | 4 |
| q | d | 5/1/2018 | 5 |
| r | e | 5/1/2018 | 6 |
+---+-----+----------+---+
我正在使用 Spark 2.3。我已经加入了这两个表(使用 Dataframe)并max(start_date)
根据条件找到了。但我无法弄清楚如何在这里提取记录。
有谁可以帮我离开这里吗
提前致谢!!
解决方案
我刚刚将日期“2018 年 11 月 1 日”更改为“2018 年 9 月 1 日”,因为字符串排序给出了不正确的结果。当转换为日期时,逻辑仍然有效。见下文
scala> val df_a = Seq((1,"a","1/1/2018"),
| (2,"a","4/1/2018"),
| (3,"a","8/1/2018"),
| (4,"c","1/1/2018"),
| (5,"d","1/1/2018"),
| (6,"e","1/1/2018")).toDF("id","value","start_date")
df_a: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]
scala> val df_b = Seq(("x","a","2/1/2018"),
| ("y","a","7/1/2018"),
| ("z","a","9/1/2018"),
| ("p","c","5/1/2018"),
| ("q","d","5/1/2018"),
| ("r","e","5/1/2018")).toDF("key","valueb","sent_date")
df_b: org.apache.spark.sql.DataFrame = [key: string, valueb: string ... 1 more field]
scala> val df_join = df_b.join(df_a,'valueb==='valuea,"inner")
df_join: org.apache.spark.sql.DataFrame = [key: string, valueb: string ... 4 more fields]
scala> df_join.filter('sent_date >= 'start_date).withColumn("rank", rank().over(Window.partitionBy('key,'valueb,'sent_date).orderBy('start_date.desc))).filter('rank===1).drop("valuea","start_date","rank").show()
+---+------+---------+---+
|key|valueb|sent_date| id|
+---+------+---------+---+
| q| d| 5/1/2018| 5|
| p| c| 5/1/2018| 4|
| r| e| 5/1/2018| 6|
| x| a| 2/1/2018| 1|
| y| a| 7/1/2018| 2|
| z| a| 9/1/2018| 3|
+---+------+---------+---+
scala>
更新
下面是处理 MM/dd/yyyy 格式的日期字符串的 udf
scala> def dateConv(x:String):String=
| {
| val y = x.split("/").map(_.toInt).map("%02d".format(_))
| y(2)+"-"+y(0)+"-"+y(1)
| }
dateConv: (x: String)String
scala> val udfdateconv = udf( dateConv(_:String):String )
udfdateconv: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> val df_a_dt = df_a.withColumn("start_date",date_format(udfdateconv('start_date),"yyyy-MM-dd").cast("date"))
df_a_dt: org.apache.spark.sql.DataFrame = [id: int, valuea: string ... 1 more field]
scala> df_a_dt.printSchema
root
|-- id: integer (nullable = false)
|-- valuea: string (nullable = true)
|-- start_date: date (nullable = true)
scala> df_a_dt.show()
+---+------+----------+
| id|valuea|start_date|
+---+------+----------+
| 1| a|2018-01-01|
| 2| a|2018-04-01|
| 3| a|2018-08-01|
| 4| c|2018-01-01|
| 5| d|2018-01-01|
| 6| e|2018-01-01|
+---+------+----------+
scala>
推荐阅读
- php - 我想获取所有用户标记的帖子。如果相同ID的帖子重复,如何忽略?
- java - “迁移到 Junit 5”选项在 IntelliJ 2018.3.6 中对于 groovy 类不可用?
- microservices - Aerospilke 的交易记录
- angular - 角度属性指令可以替换父元素和子元素吗?
- python - 在 ubuntu 14 上安装 pandas 时出错
- ios - iOS 将 carplay 连接到 SDA 无线
- amazon-web-services - AWS CloudFormation 的全局环境变量
- python - Python re.match 仅在第一个 \n 之前匹配
- google-apps-script - “需要授权才能执行该操作”用于警报/弹出窗口,电子表格的 Google Apps 脚本插件
- javascript - Ajax 响应没有响应 json 响应