首页 > 解决方案 > 包含空键的 Apache Spark Join

问题描述

我的目标是加入两个数据框,从两者中获取信息,尽管我的连接键中可以有空值。这是我的两个数据框:

val data1 = Seq(
  (601, null, null, "8121000868-10", "CN88"),
  (3925, null, null, "8121000936-50",   "CN88")
)

val df1 = data1.toDF("id", "work_order_number", "work_order_item_number", "tally_number", "company_code")

val data2 = Seq(
  (null, null, "8121000868-10", "CN88", "popo"),
  (null, null, "8121000936-50", "CN88", "Smith")
)

val df2 = data2.toDF("work_order_number", "work_order_item_number", "tally_number", "company_code", "name")

实际上,我的目标是从 df1 获取“id”,将其重命名为“tally_summary_id”,并能够将一些其他信息重新附加到每个 id。这是我的代码:

val final_df =
  df1.select(col("id").alias("tally_summary_id"), col("work_order_number"), col("work_order_item_number"),
    col("tally_number"), col("company_code"))
  .join(df2, Seq("tally_number", "work_order_number", "work_order_item_number", "company_code"), "full")

左连接给我:

+-------------+-----------------+----------------------+------------+----------------+----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id|name|
+-------------+-----------------+----------------------+------------+----------------+----+
|8121000868-10|             null|                  null|        CN88|             601|null|
|8121000936-50|             null|                  null|        CN88|            3925|null|
+-------------+-----------------+----------------------+------------+----------------+----+

一个正确的加入给我:

+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10|             null|                  null|        CN88|            null| popo|
|8121000936-50|             null|                  null|        CN88|            null|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+

一个完整的加入给我:

+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10|             null|                  null|        CN88|             601| null|
|8121000868-10|             null|                  null|        CN88|            null| popo|
|8121000936-50|             null|                  null|        CN88|            3925| null|
|8121000936-50|             null|                  null|        CN88|            null|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+

我该怎么做才能拥有这样的东西:

+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10|             null|                  null|        CN88|             601|popo |
|8121000936-50|             null|                  null|        CN88|            3925|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+

标签: scalaapache-sparkjoinleft-joininner-join

解决方案


您可以使用 null 安全的相等运算符,如此<=>所示。

我在数据框创建中添加了一个模式,因为似乎没有它,自动模式推断不会为只有空值的列提供类型,并且连接失败。

生成的数据框正是您想要的

import scala.collection.JavaConversions._

val data1 = Seq(
  Row(601, null, null, "8121000868-10", "CN88"),
  Row(3925, null, null, "8121000936-50", "CN88")
)

val schema1 = StructType(List(
  StructField("id", IntegerType, false),
  StructField("work_order_number", StringType, true),
  StructField("work_order_item_number", StringType, true),
  StructField("tally_number", StringType, true),
  StructField("company_code", StringType, true)
))

val df1 = sparkSession.createDataFrame(data1, schema1)

val data2 = Seq(
  Row(null, null, "8121000868-10", "CN88", "popo"),
  Row(null, null, "8121000936-50", "CN88", "Smith")
)

val schema2 = StructType(Seq(
  StructField("work_order_number", StringType, true),
  StructField("work_order_item_number", StringType, true),
  StructField("tally_number", StringType, true),
  StructField("company_code", StringType, true),
  StructField("name", StringType, false)
))

val df2 = sparkSession.createDataFrame(data2, schema2)


val final_df =
  df1.join(df2, df1("tally_number") <=> df2("tally_number")
      && df1("work_order_number") <=> df2("work_order_number")
      && df1("work_order_item_number") <=> df2("work_order_item_number")
      && df1("company_code") <=> df2("company_code")
      , "inner")
    .select(df1("tally_number"),
      df1("work_order_number"),
      df1("work_order_item_number"),
      df1("company_code"),
      df1("id").as("tally_summary_id"),
      df2("name"))

推荐阅读