scala - 合并到 deltalake 表中更新所有行
问题描述
我正在尝试使用 spark 数据框更新 deltalake 表。我想要做的是更新 spark 数据帧中与 deltalake 表中不同的所有行,并插入 deltalake 表中缺少的所有行。
我试图这样做如下:
import io.delta.tables._
val not_equal_string = df.schema.fieldNames.map(fn =>
s"coalesce(not ((updates.${fn} = history.${fn}) or (isnull(history.${fn}) and isnull(updates.${fn})) ),false)"
).reduceLeft((x,y) => s"$x OR $y ")
val deltaTable = DeltaTable.forPath(spark, "s3a://sparkdata/delta-table")
deltaTable.as("history").merge(
df.as("updates"), "updates.EquipmentKey = history.EquipmentKey"
).whenMatched(not_equal_string).updateAll().whenNotMatched().insertAll().execute()
这可行,但是当我查看生成的增量表时,我发现即使我没有更新单个记录,它的大小也有效地翻了一番。生成了一个新的 json 文件,其中删除了每个旧分区并添加了所有新分区。
当我只是使用whenMatched标准作为 where 条件运行 sql join 时,我没有得到一行。
我希望在这样的合并操作之后不会改变增量表。我错过了一些简单的东西吗?
解决方案
推荐阅读
- excel - 使用 powershell 创建监视文件夹以更改加载到其中的多个 Excel 工作表中的特定列(从一般到最新)
- java - 复制公式 POI 的单元格值
- python - 使用默认值验证 django 中的表单字段
- python - 如何在 Packrat 环境中使用 rpy2?
- hibernate - Spring Data/Hibernate 使用了错误的列名
- angular - 在旧的角度版本(版本 5,4..)中使用角度元素
- unity3d - 使用 Unity 构建的 Oculus Go VR 应用程序中的 Web 浏览器
- git - 之间的区别。git中的(点)和*(星号)通配符
- tensorflow - Tensorflow 1.12 将 dataset.filter 应用于 dataset.window
- node.js - AWS S3 - 从 getSignedUrl() PUT 到 URL 返回 403 SignatureDoesNotMatch 错误