scala - 如何在 Apache Spark 中执行 UPSERT 或 MERGE 操作?
问题描述
我正在尝试使用 Apache Spark 使用唯一列“ID”更新并将记录插入旧数据框。
解决方案
为了更新 Dataframe,您可以对唯一列执行“left_anti”连接,然后将其与包含新记录的 Dataframe 合并
def refreshUnion(oldDS: Dataset[_], newDS: Dataset[_], usingColumns: Seq[String]): Dataset[_] = {
val filteredNewDS = selectAndCastColumns(newDS, oldDS)
oldDS.join(
filteredNewDS,
usingColumns,
"left_anti")
.select(oldDS.columns.map(columnName => col(columnName)): _*)
.union(filteredNewDS.toDF)
}
def selectAndCastColumns(ds: Dataset[_], refDS: Dataset[_]): Dataset[_] = {
val columns = ds.columns.toSet
ds.select(refDS.columns.map(c => {
if (!columns.contains(c)) {
lit(null).cast(refDS.schema(c).dataType) as c
} else {
ds(c).cast(refDS.schema(c).dataType) as c
}
}): _*)
}
val df = refreshUnion(oldDS, newDS, Seq("ID"))