首页 > 解决方案 > SparkSql:保持右数据集边界的左外连接的有效方法

问题描述

我需要加入两个时间序列数据集(左和右)我必须考虑左数据集中的所有记录,即使右数据集中没有匹配项(我可以为此使用左外连接)。但同时我必须保持正确数据集的开始和结束边界。

left dataset :

+-----------+-------+
| Timestamp | L_val |
+-----------+-------+
| …         | …     |
+-----------+-------+
| …         | …     |
+-----------+-------+
| 10001     | 346   |
+-----------+-------+
| 10002     | 987   |
+-----------+-------+
| 10003     | 788   |
+-----------+-------+
| 10004     | 567   |
+-----------+-------+
| 10005     | 665   |
+-----------+-------+
| 10006     | 654   |
+-----------+-------+
| 10007     | 345   |
+-----------+-------+
| 10008     | 565   |
+-----------+-------+
| 10009     | 567   |
+-----------+-------+
| ….        | ….    |
+-----------+-------+
| …         | …     |
+-----------+-------+
|           |       |
+-----------+-------+


right dataset:
+-----------+-------+
| Timestamp | R_val |
+-----------+-------+
| 10004     | 345   |
+-----------+-------+
| 10005     | 654   |
+-----------+-------+
| 10007     | 65    |
+-----------+-------+
| 10008     | 234   |
+-----------+-------+

required-joined-dataset:
+-----------+-------+-------+
| Timestamp | L_val | R_val |
+-----------+-------+-------+
| 10004     | 567   | 345   |
+-----------+-------+-------+
| 10005     | 665   | 654   |
+-----------+-------+-------+
| 10006     | 654   |       |
+-----------+-------+-------+
| 10007     | 345   | 65    |
+-----------+-------+-------+
| 10008     | 565   | 234   |
+-----------+-------+-------+

标签: apache-sparkapache-spark-sql

解决方案


scala> df_L.show(false)
+---------+-----+
|Timestamp|L_val|
+---------+-----+
|10001    |346  |
|10002    |987  |
|10003    |788  |
|10004    |567  |
|10005    |665  |
|10006    |654  |
|10007    |345  |
|10008    |565  |
|10009    |567  |
+---------+-----+


scala> df_R.show(false)
+---------+-----+
|Timestamp|R_val|
+---------+-----+
|10004    |345  |
|10005    |654  |
|10007    |65   |
|10008    |234  |
+---------+-----+


scala> val minTime = df_R.select(min("Timestamp")).rdd.collect.map(r => r(0)).mkString.toLong
minTime: Long = 10004

scala> val maxTime = df_R.select(max("Timestamp")).rdd.collect.map(r => r(0)).mkString.toLong
maxTime: Long = 10008

scala> df_L.alias("L").join(df_R.alias("R"), List("Timestamp"), "left").filter(col("L.Timestamp") >= minTime && col("L.Timestamp") <= maxTime ).na.fill("").show(false)
+---------+-----+-----+
|Timestamp|L_val|R_val|
+---------+-----+-----+
|10004    |567  |345  |
|10005    |665  |654  |
|10006    |654  |     |
|10007    |345  |65   |
|10008    |565  |234  |
+---------+-----+-----+

或者

//For more efficient filter Left dataframe first and join with result

scala> df_L.alias("L").filter(col("L.Timestamp") >= minTime && col("L.Timestamp") <= maxTime ).join(df_R.alias("R"), List("Timestamp"), "left").show(false)
+---------+-----+-----+
|Timestamp|L_val|R_val|
+---------+-----+-----+
|10004    |567  |345  |
|10005    |665  |654  |
|10006    |654  |null |
|10007    |345  |65   |
|10008    |565  |234  |
+---------+-----+-----+

推荐阅读