apache-spark - 如何使用 spark 插入 HDFS?
问题描述
我在 HDFS 中对数据进行了分区。在某个时候,我决定更新它。算法是:
- 从 kafka 主题中读取新数据。
- 找出新数据的分区名称。
- 从 HDFS 中具有这些名称的分区加载数据。
- 将 HDFS 数据与新数据合并。
- 覆盖已经在磁盘上的分区。
问题是,如果新数据具有磁盘上尚不存在的分区怎么办。在这种情况下,它们不会被写入。https://stackoverflow.com/a/49691528/10681828 <- 例如,此解决方案不会写入新分区。
上图描述了这种情况。让我们将左侧磁盘视为已经存在于 HDFS 中的分区,将右侧磁盘视为我们刚刚从 Kafka 收到的分区。
正确磁盘的某些分区将与已经存在的分区相交,而其他分区则不会。而这段代码:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
.write
.mode(SaveMode.Overwrite)
.partitionBy("date", "key")
.option("header", "true")
.format(format)
.save(path)
无法将图片的蓝色部分写入磁盘。
那么,我该如何解决这个问题呢?请提供代码。我正在寻找高性能的东西。
不明白的人举个例子:
假设我们在 HDFS 中有这些数据:
- PartitionA 有数据“1”
- 分区 B 有数据“1”
现在我们收到这个新数据:
- 分区 B 有数据“2”
- PartitionC 有数据“1”
因此,分区 A 和 B 在 HDFS 中,分区 B 和 C 是新分区,由于 B 在 HDFS 中,我们对其进行更新。我想写C。所以最终结果应该是这样的:
- PartitionA 有数据“1”
- 分区 B 有数据“2”
- PartitionC 有数据“1”
但是如果我使用上面的代码,我会得到:
- PartitionA 有数据“1”
- 分区 B 有数据“2”
因为 spark 2.3 的新功能overwrite dynamic
无法创建 PartitionC。
更新:事实证明,如果您改用 hive 表,这将起作用。但是如果你使用纯火花它不会......所以,我猜蜂巢的覆盖和火花的覆盖工作不同。
解决方案
最后,我决定从 HDFS 中删除分区的“绿色”子集,并SaveMode.Append
改用它。我认为这是火花中的一个错误。
推荐阅读
- apex - 将额外参数从 ebs 传递到 apex
- google-cloud-platform - Terraform:如何在不锁定自己的情况下使用 iam_policy
- python - 在数据框的列中填充连续的 NAN
- redis - spring 将 redis 轮询器与事务集成
- smalltalk - 有没有特定的方法可以在 Pharo smalltalk 中获取计算机的 IP?
- dictionary - golang struct 并发读写不带锁也运行ok?
- android - 适当的地方 signOut() 方法 google api
- regex - 正则表达式:匹配模式 0 次或更多次何时有用?
- node.js - 节点进程在后续请求中找不到 setTimeout 对象
- c# - 在 ReportViewer 中表示“列表列表”