apache-spark - 在 Spark 中写入唯一值,同时保留旧值
问题描述
我有一个按计划执行的 Spark 作业。
当我将结果 DataFrame 写入数据目标(S3、HDFS、DB ...)时,我希望 Spark 写入的内容不会针对特定列重复。
例子:
假设这MY_ID
是唯一的列。
第一次执行:
--------------
|MY_ID|MY_VAL|
--------------
| 1 | 5 |
| 2 | 9 |
| 3 | 6 |
--------------
第二次执行:
--------------
|MY_ID|MY_VAL|
--------------
| 2 | 9 |
| 3 | 2 |
| 4 | 4 |
--------------
在 2 次执行之后,我期望在Data Target中找到如下内容:
--------------
|MY_ID|MY_VAL|
--------------
| 1 | 5 |
| 2 | 9 |
| 3 | 6 |
| 4 | 4 |
--------------
预期输出是第一次执行的结果,附加了第二次执行的结果。如果 的值MY_ID
已经存在,则保留旧的,丢弃新执行的结果(在这种情况下,第二次执行想要为MY_ID
3写入MY_VAL
9。由于该记录在第一次执行时已经存在,因此新记录是丢弃)。
所以distinct()
功能不足以保证这个条件。MY_ID
即使在转储的输出中也应保持列的唯一性。
是否有任何解决方案可以以合理的计算成本保证此属性?UNIQUE
(这与关系数据库中的想法基本相同。)
解决方案
您可以fullOuterJoin
在第一次和第二次迭代中进行。
val joined = firstIteration.join(secondIteration, Seq("MY_ID"), "fullouter")
scala> joined.show
+-----+------+------+
|MY_ID|MY_VAL|MY_VAL|
+-----+------+------+
| 1| 5| null|
| 3| 6| 2|
| 4| null| 4|
| 2| 9| 9|
+-----+------+------+
从结果表中,如果 firstIterationMY_VAL
有值,您可以按原样使用它。否则,如果它null
(表示密钥仅在第二次迭代中出现)。使用来自 secondIteration 的值MY_VAL
。
scala> joined.withColumn("result", when(firstIteration.col("MY_VAL").isNull, secondIteration.col("MY_VAL"))
.otherwise(firstIteration.col("MY_VAL")))
.drop("MY_VAL")
.show
+-----+------+
|MY_ID|result|
+-----+------+
| 1| 5|
| 3| 6|
| 4| 4|
| 2| 9|
+-----+------+
推荐阅读
- python - 如何从客户那里收取多笔付款并在 Django 中计算剩余余额?
- facebook - FB graph Api:使用自定义标签获取用户
- angular - *ngFor 在数据库响应后不更新
- algorithm - 我们如何渲染抗锯齿 2D 多边形?
- python - Python:增加饼图中文本标签之间的间距
- azure - 解决方案请 - 需要从 Azure SDK 2.7 更新到 2.9
- reactjs - 从共享点检索数据
- firebase - 是否可以使用 React Native Firebase 中的下载 URI 获取 firebase 存储中文件的引用?
- reactjs - ReactJS 中的 Cookie.js 在渲染/路由到配置文件页面时将角色值设置为未定义
- python - 我想从 python 3.8 的列表中删除特定元素及其重复项