首页 > 解决方案 > 如何使用 spark 插入 HDFS?

问题描述

我在 HDFS 中对数据进行了分区。在某个时候,我决定更新它。算法是:

问题是,如果新数据具有磁盘上尚不存在的分区怎么办。在这种情况下,它们不会被写入。https://stackoverflow.com/a/49691528/10681828 <- 例如,此解决方案不会写入新分区。 在此处输入图像描述

上图描述了这种情况。让我们将左侧磁盘视为已经存在于 HDFS 中的分区,将右侧磁盘视为我们刚刚从 Kafka 收到的分区。

正确磁盘的某些分区将与已经存在的分区相交,而其他分区则不会。而这段代码:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
    .write
    .mode(SaveMode.Overwrite)
    .partitionBy("date", "key")
    .option("header", "true")
    .format(format)
    .save(path)

无法将图片的蓝色部分写入磁盘。

那么,我该如何解决这个问题呢?请提供代码。我正在寻找高性能的东西。

不明白的人举个例子:

假设我们在 HDFS 中有这些数据:

现在我们收到这个新数据:

因此,分区 A 和 B 在 HDFS 中,分区 B 和 C 是新分区,由于 B 在 HDFS 中,我们对其进行更新。我想写C。所以最终结果应该是这样的:

但是如果我使用上面的代码,我会得到:

因为 spark 2.3 的新功能overwrite dynamic无法创建 PartitionC。

更新:事实证明,如果您改用 hive 表,这将起作用。但是如果你使用纯火花它不会......所以,我猜蜂巢的覆盖和火花的覆盖工作不同。

标签: apache-sparkapache-spark-sqlhdfsbigdata

解决方案


最后,我决定从 HDFS 中删除分区的“绿色”子集,并SaveMode.Append改用它。我认为这是火花中的一个错误。


推荐阅读