首页 > 解决方案 > 在 Spark Join [Scala] 中包含空值

问题描述

我有 2 个 dfs,我想对所有列进行内部连接

val seq1 = Seq((Some("1"), Some("Cat")), (Some("2"), Some("Dog")), (Some("3"), None))
val df1 = seq1.toDF("id", "name")

val s2 = Seq((Some("1"), Some("Cat")), (Some("2"), Some("Dog")), (Some("3"), None))
val df2 = s2.toDF("id", "name")

val s3 = Seq((Some("1"), Some("Cat")), (Some("2"), Some("Dog")), (Some("3"), None))
val df3 = s3.toDF("id", "name")

我想得到df1.join(df2, df1.columns, "inner")df3,但现在它不包括("3", null).

<=>我看到了一些使用or的答案,.eqNullSafe但我不确定如何将其应用于 scala 代码。我想要一个可以应用于任何 dfs 的通用解决方案 - 事先不知道列的名称。

我可以做类似的事情 df1.join(df2, df1.columns.map(c => col(c).eqNullSafe()): _*, "inner")吗?编译器不喜欢它,但这就是想法。

标签: scalaapache-sparkapache-spark-sql

解决方案


你可以像这样建立你的自定义连接条件:

val joinCondition = df1.columns.foldLeft(lit(true))((acc,c) => acc and  (df1(c) === df2(c) or df1(c).isNull or df2(c).isNull))

df1.join(df2, joinCondition, "inner")
  .select(df1("*"))

但是由于你df2的测试用例是空的,这仍然会导致一个空的结果。你不能使用union或只是一个left-join


推荐阅读