首页 > 解决方案 > 拼花地板 Pyspark 中的 UPSERT

问题描述

我在 s3 中有带有以下分区的镶木地板文件:年 / 月 / 日期 / some_id 使用 Spark (PySpark),在过去的14 天里,我每天都想进行一种 UPSERT - 我想替换 s3 中的现有数据(一个parquet 文件),但不要删除 14 天之前的日期。我尝试了两种保存模式: 追加- 不好,因为它只是添加了另一个文件。 overwrite - 正在删除过去的数据和其他分区的数据。

有没有办法或最佳实践来克服这个问题?我应该在每次运行中从 s3 读取所有数据,然后再写回来吗?也许重命名文件以便追加将替换 s3 中的当前文件?

非常感谢!

标签: amazon-s3pysparketlparquet

解决方案


我通常会做类似的事情。就我而言,我执行 ETL 并将一天的数据附加到镶木地板文件中:

关键是使用您要写入的数据(在我的情况下是实际日期),确保按date列分区并覆盖当前日期的所有数据。

这将保留所有旧数据。举个例子:

(
    sdf
    .write
    .format("parquet")
    .mode("overwrite")
    .partitionBy("date")
    .option("replaceWhere", "2020-01-27")
    .save(uri)
)

您还可以查看delta.io,它是parquet格式的扩展,提供了一些有趣的功能,例如ACID事务。


推荐阅读