首页 > 解决方案 > 在 Spark 中写入唯一值,同时保留旧值

问题描述

我有一个按计划执行的 Spark 作业。

当我将结果 DataFrame 写入数据目标(S3、HDFS、DB ...)时,我希望 Spark 写入的内容不会针对特定列重复。

例子:

假设这MY_ID是唯一的列。

第一次执行:

--------------
|MY_ID|MY_VAL|
--------------
|  1  |   5  |
|  2  |   9  |
|  3  |   6  |
--------------

第二次执行:

--------------
|MY_ID|MY_VAL|
--------------
|  2  |   9  |
|  3  |   2  |
|  4  |   4  |
--------------

在 2 次执行之后,我期望在Data Target中找到如下内容:

--------------
|MY_ID|MY_VAL|
--------------
|  1  |   5  |
|  2  |   9  |
|  3  |   6  |
|  4  |   4  |
--------------

预期输出是第一次执行的结果,附加了第二次执行的结果。如果 的值MY_ID已经存在,则保留旧的,丢弃新执行的结果(在这种情况下,第二次执行想要为MY_ID3写入MY_VAL9。由于该记录在第一次执行时已经存在,因此新记录是丢弃)。

所以distinct()功能不足以保证这个条件。MY_ID即使在转储的输出中也应保持列的唯一性。

是否有任何解决方案可以以合理的计算成本保证此属性?UNIQUE(这与关系数据库中的想法基本相同。)

标签: apache-sparkdataframeapache-spark-sql

解决方案


您可以fullOuterJoin在第一次和第二次迭代中进行。

val joined = firstIteration.join(secondIteration, Seq("MY_ID"), "fullouter")

scala> joined.show
+-----+------+------+
|MY_ID|MY_VAL|MY_VAL|
+-----+------+------+
|    1|     5|  null|
|    3|     6|     2|
|    4|  null|     4|
|    2|     9|     9|
+-----+------+------+

从结果表中,如果 firstIterationMY_VAL有值,您可以按原样使用它。否则,如果它null(表示密钥仅在第二次迭代中出现)。使用来自 secondIteration 的值MY_VAL

scala> joined.withColumn("result", when(firstIteration.col("MY_VAL").isNull, secondIteration.col("MY_VAL"))
        .otherwise(firstIteration.col("MY_VAL")))
       .drop("MY_VAL")
       .show
+-----+------+
|MY_ID|result|
+-----+------+
|    1|     5|
|    3|     6|
|    4|     4|
|    2|     9|
+-----+------+

推荐阅读