首页 > 解决方案 > 如何在 Apache Spark 中执行 UPSERT 或 MERGE 操作?

问题描述

我正在尝试使用 Apache Spark 使用唯一列“ID”更新并将记录插入旧数据框。

标签: scalaapache-spark

解决方案


为了更新 Dataframe,您可以对唯一列执行“left_anti”连接,然后将其与包含新记录的 Dataframe 合并

def refreshUnion(oldDS: Dataset[_], newDS: Dataset[_], usingColumns: Seq[String]): Dataset[_] = {
    val filteredNewDS = selectAndCastColumns(newDS, oldDS)
    oldDS.join(
      filteredNewDS,
      usingColumns,
      "left_anti")
      .select(oldDS.columns.map(columnName => col(columnName)): _*)
      .union(filteredNewDS.toDF)
  }

  def selectAndCastColumns(ds: Dataset[_], refDS: Dataset[_]): Dataset[_] = {
    val columns = ds.columns.toSet
    ds.select(refDS.columns.map(c => {
      if (!columns.contains(c)) {
        lit(null).cast(refDS.schema(c).dataType) as c
      } else {
        ds(c).cast(refDS.schema(c).dataType) as c
      }
    }): _*)
  }

val df = refreshUnion(oldDS, newDS, Seq("ID"))

推荐阅读