apache-spark - SparkSql:保持右数据集边界的左外连接的有效方法
问题描述
我需要加入两个时间序列数据集(左和右)我必须考虑左数据集中的所有记录,即使右数据集中没有匹配项(我可以为此使用左外连接)。但同时我必须保持正确数据集的开始和结束边界。
left dataset :
+-----------+-------+
| Timestamp | L_val |
+-----------+-------+
| … | … |
+-----------+-------+
| … | … |
+-----------+-------+
| 10001 | 346 |
+-----------+-------+
| 10002 | 987 |
+-----------+-------+
| 10003 | 788 |
+-----------+-------+
| 10004 | 567 |
+-----------+-------+
| 10005 | 665 |
+-----------+-------+
| 10006 | 654 |
+-----------+-------+
| 10007 | 345 |
+-----------+-------+
| 10008 | 565 |
+-----------+-------+
| 10009 | 567 |
+-----------+-------+
| …. | …. |
+-----------+-------+
| … | … |
+-----------+-------+
| | |
+-----------+-------+
right dataset:
+-----------+-------+
| Timestamp | R_val |
+-----------+-------+
| 10004 | 345 |
+-----------+-------+
| 10005 | 654 |
+-----------+-------+
| 10007 | 65 |
+-----------+-------+
| 10008 | 234 |
+-----------+-------+
required-joined-dataset:
+-----------+-------+-------+
| Timestamp | L_val | R_val |
+-----------+-------+-------+
| 10004 | 567 | 345 |
+-----------+-------+-------+
| 10005 | 665 | 654 |
+-----------+-------+-------+
| 10006 | 654 | |
+-----------+-------+-------+
| 10007 | 345 | 65 |
+-----------+-------+-------+
| 10008 | 565 | 234 |
+-----------+-------+-------+
解决方案
scala> df_L.show(false)
+---------+-----+
|Timestamp|L_val|
+---------+-----+
|10001 |346 |
|10002 |987 |
|10003 |788 |
|10004 |567 |
|10005 |665 |
|10006 |654 |
|10007 |345 |
|10008 |565 |
|10009 |567 |
+---------+-----+
scala> df_R.show(false)
+---------+-----+
|Timestamp|R_val|
+---------+-----+
|10004 |345 |
|10005 |654 |
|10007 |65 |
|10008 |234 |
+---------+-----+
scala> val minTime = df_R.select(min("Timestamp")).rdd.collect.map(r => r(0)).mkString.toLong
minTime: Long = 10004
scala> val maxTime = df_R.select(max("Timestamp")).rdd.collect.map(r => r(0)).mkString.toLong
maxTime: Long = 10008
scala> df_L.alias("L").join(df_R.alias("R"), List("Timestamp"), "left").filter(col("L.Timestamp") >= minTime && col("L.Timestamp") <= maxTime ).na.fill("").show(false)
+---------+-----+-----+
|Timestamp|L_val|R_val|
+---------+-----+-----+
|10004 |567 |345 |
|10005 |665 |654 |
|10006 |654 | |
|10007 |345 |65 |
|10008 |565 |234 |
+---------+-----+-----+
或者
//For more efficient filter Left dataframe first and join with result
scala> df_L.alias("L").filter(col("L.Timestamp") >= minTime && col("L.Timestamp") <= maxTime ).join(df_R.alias("R"), List("Timestamp"), "left").show(false)
+---------+-----+-----+
|Timestamp|L_val|R_val|
+---------+-----+-----+
|10004 |567 |345 |
|10005 |665 |654 |
|10006 |654 |null |
|10007 |345 |65 |
|10008 |565 |234 |
+---------+-----+-----+
推荐阅读
- python - 如何在 Paraview 中渲染一个简单的 2D vtkImageData 对象?
- ios - 如何使用 swift 制作正确的本地 JSON 文件和正确的模型
- javascript - 减速器之一没有被触发
- ios - 启用语音控制时未正确调用 EditActionsForRowAt
- php - 尝试将 laravel 项目部署到 VPS
- python - XGBoost (Python) 在 AWS 实例上似乎不能很好地扩展
- scala - Spark DataFrame 转换为 Dataset 不会更新 Schema 中的 NullType 列
- python - 连接具有不同索引的等效值的 DataFrame
- jquery - 使用 Laravel8 同时更新两个表
- elasticsearch - 在 Magento 2 中收集的自定义多面数据