scala - 包含空键的 Apache Spark Join
问题描述
我的目标是加入两个数据框,从两者中获取信息,尽管我的连接键中可以有空值。这是我的两个数据框:
val data1 = Seq(
(601, null, null, "8121000868-10", "CN88"),
(3925, null, null, "8121000936-50", "CN88")
)
val df1 = data1.toDF("id", "work_order_number", "work_order_item_number", "tally_number", "company_code")
val data2 = Seq(
(null, null, "8121000868-10", "CN88", "popo"),
(null, null, "8121000936-50", "CN88", "Smith")
)
val df2 = data2.toDF("work_order_number", "work_order_item_number", "tally_number", "company_code", "name")
实际上,我的目标是从 df1 获取“id”,将其重命名为“tally_summary_id”,并能够将一些其他信息重新附加到每个 id。这是我的代码:
val final_df =
df1.select(col("id").alias("tally_summary_id"), col("work_order_number"), col("work_order_item_number"),
col("tally_number"), col("company_code"))
.join(df2, Seq("tally_number", "work_order_number", "work_order_item_number", "company_code"), "full")
左连接给我:
+-------------+-----------------+----------------------+------------+----------------+----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id|name|
+-------------+-----------------+----------------------+------------+----------------+----+
|8121000868-10| null| null| CN88| 601|null|
|8121000936-50| null| null| CN88| 3925|null|
+-------------+-----------------+----------------------+------------+----------------+----+
一个正确的加入给我:
+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10| null| null| CN88| null| popo|
|8121000936-50| null| null| CN88| null|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+
一个完整的加入给我:
+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10| null| null| CN88| 601| null|
|8121000868-10| null| null| CN88| null| popo|
|8121000936-50| null| null| CN88| 3925| null|
|8121000936-50| null| null| CN88| null|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+
我该怎么做才能拥有这样的东西:
+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10| null| null| CN88| 601|popo |
|8121000936-50| null| null| CN88| 3925|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+
解决方案
您可以使用 null 安全的相等运算符,如此处<=>
所示。
我在数据框创建中添加了一个模式,因为似乎没有它,自动模式推断不会为只有空值的列提供类型,并且连接失败。
生成的数据框正是您想要的
import scala.collection.JavaConversions._
val data1 = Seq(
Row(601, null, null, "8121000868-10", "CN88"),
Row(3925, null, null, "8121000936-50", "CN88")
)
val schema1 = StructType(List(
StructField("id", IntegerType, false),
StructField("work_order_number", StringType, true),
StructField("work_order_item_number", StringType, true),
StructField("tally_number", StringType, true),
StructField("company_code", StringType, true)
))
val df1 = sparkSession.createDataFrame(data1, schema1)
val data2 = Seq(
Row(null, null, "8121000868-10", "CN88", "popo"),
Row(null, null, "8121000936-50", "CN88", "Smith")
)
val schema2 = StructType(Seq(
StructField("work_order_number", StringType, true),
StructField("work_order_item_number", StringType, true),
StructField("tally_number", StringType, true),
StructField("company_code", StringType, true),
StructField("name", StringType, false)
))
val df2 = sparkSession.createDataFrame(data2, schema2)
val final_df =
df1.join(df2, df1("tally_number") <=> df2("tally_number")
&& df1("work_order_number") <=> df2("work_order_number")
&& df1("work_order_item_number") <=> df2("work_order_item_number")
&& df1("company_code") <=> df2("company_code")
, "inner")
.select(df1("tally_number"),
df1("work_order_number"),
df1("work_order_item_number"),
df1("company_code"),
df1("id").as("tally_summary_id"),
df2("name"))
推荐阅读
- android - Android Radio Group在相对布局中的按钮上对齐
- c# - 如何为特定的代码段设置读写锁?
- ejb - org.xnio.BrokenPipeException:Wildfly-9.0.1 中远程端关闭消息流间歇性异常。Final 导致死锁
- c# - 将 base64 字符串转换为图片框图像时应用程序挂起
- swift - 如何使用泛型参数类型创建 PushRow 函数
- jquery - 如何计算数量
- 从特定的
- ?
- 从特定的
- python - 如何使用 cls.__members__ 获得“小写”命名空间?
- javascript - 使用动态 vuetify v-select 绑定预加载的 vuex 数据
- android - 我想过滤位置(LatLng)
- java - Java中的类和类有什么区别?