首页 > 解决方案 > Delta Lake 增量清单文件生成

问题描述

我正在尝试使用开源 delta Lake api 在 S3 上设置 delta Lake。我的表按日期分区,我必须执行合并(合并也可能更新旧分区)。我正在生成清单文件,以便我可以使用 AWS Athena 来查询 delta Lake,但是当我运行清单文件生成方法时,delta Lakes 会为所有分区创建清单文件。有没有办法生成增量清单文件,仅为最后更新的分区创建/更新文件,或者您是否可以指定分区来生成清单文件。

df = spark.read.csv(s3://temp/2020-01-01.csv)
delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.alias("source").merge(df.alias("new_data"), condition).whenNotMatchedInsertAll().execute()

delta_table.generate("symlink_format_manifest")

标签: apache-sparkamazon-athenadelta-lake

解决方案


我遇到了同样的问题,在一张有大量分区的大桌子上运行清单是一种矫枉过正。我能够通过以下两种方法解决它(解决方法)

  1. 所以最简单的是,使用 spark 在 Hive Metastore 中使用 DDL 创建您的增量表,提供文件夹(S3)的位置以及TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true). 使用 spark 将数据加载到同一位置,这将在附加/覆盖数据后立即为任何分区创建/更新清单文件。

spark.sql("CREATE TABLE student (id INT, name STRING, age INT) USING delta PARTITIONED BY (age) LOCATION 's3://path/student' TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)")

对于一个新表,这应该不是问题,但是,上面是一个已经创建并且需要重新加载的表的解决方法。

  1. 我遵循的另一个选项(一个棘手的选项)是hdfs dfs -cat s3://path/student/_delta_log/*.json | grep 'metadata'在 _delta_log 文件夹中找到并复制元数据文件()。commitInfo-->operationParameters在as"properties":"{\"delta.compatibility.symlinkFormatManifest.enabled\":\"true\"}metaDataas下添加相同的上述 TBLPROPERTIES"configuration":{"delta.compatibility.symlinkFormatManifest.enabled":"true"} 创建一个新的 .json 并将文件名重命名为(last sequence of the json in _delta_log folder+1).json并将其移动到 _delta_log。下一次加载之后,您可以看到它正在自动创建清单文件。

推荐阅读