首页 > 解决方案 > 合并到 deltalake 表中更新所有行

问题描述

我正在尝试使用 spark 数据框更新 deltalake 表。我想要做的是更新 spark 数据帧中与 deltalake 表中不同的所有行,并插入 deltalake 表中缺少的所有行。

我试图这样做如下:

import io.delta.tables._

val not_equal_string = df.schema.fieldNames.map(fn => 
    s"coalesce(not ((updates.${fn} = history.${fn}) or (isnull(history.${fn}) and isnull(updates.${fn})) ),false)"
    ).reduceLeft((x,y) => s"$x OR $y ")

val deltaTable = DeltaTable.forPath(spark, "s3a://sparkdata/delta-table")

deltaTable.as("history").merge(
    df.as("updates"), "updates.EquipmentKey = history.EquipmentKey"
).whenMatched(not_equal_string).updateAll().whenNotMatched().insertAll().execute()

这可行,但是当我查看生成的增量表时,我发现即使我没有更新单个记录,它的大小也有效地翻了一番。生成了一个新的 json 文件,其中删除了每个旧分区并添加了所有新分区。

当我只是使用whenMatched标准作为 where 条件运行 sql join 时,我没有得到一行。

我希望在这样的合并操作之后不会改变增量表。我错过了一些简单的东西吗?

标签: scalaapache-sparkdelta-lake

解决方案


推荐阅读