首页 > 解决方案 > 如何防止重复条目进入 Azure Storage 的 delta Lake

问题描述

我有一个以 delta 格式存储到 Adls 中的数据框,现在当我尝试将新的更新行附加到该 delta 湖时,它应该有什么方法可以删除 delta 中的旧现有记录并添加新的更新记录。

Delta 中存储的 DataFrame 架构有一个唯一的 Column。通过它我们可以检查记录是更新的还是新的。

标签: azuredatabricksazure-databricksdelta-lakeazure-data-lake-gen2

解决方案


这是Merge 命令的任务- 您定义合并条件(您的唯一列),然后是操作。在 SQL 中,它可能如下所示(column是您的唯一列,并且updates可能是您注册为临时视图的数据框):

MERGE INTO destination
USING updates
ON destination.column = updates.column
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

在 Python 中,它可能如下所示:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/destination/")

deltaTable.alias("dest").merge(
    updatesDF.alias("updates"),
    "dest.column = updates.column") \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

推荐阅读