首页 > 解决方案 > Spark SQL 连接三个数据帧的快速方法

问题描述

PARENT_DATA_FRAME:

+------------+------------+------------+------------+------------+
|key_col_0   |key_col_1   |key_col_2   |key_col_3   |val_0       |
+------------+------------+------------+------------+------------+
|key000000   |key000001   |key000002   |key000003   |val_0       |
|key000010   |key000011   |key000012   |key000013   |val_1       |
|key000020   |key000021   |key000022   |key000023   |val_2       |
|key000030   |key000031   |key000032   |key000033   |val_3       |
|key000040   |key000041   |key000042   |key000043   |val_4       |
+------------+------------+------------+------------+------------+

CHILD_A_DATA_FRAME:

+------------+------------+------------+------------+------------+
|key_col_0   |key_col_1   |key_col_2   |key_col_3   |val_0       |
+------------+------------+------------+------------+------------+
|key000000   |key000001   |key000002   |key000003   |val_0       |
|key000010   |key000011   |key000012   |key000013   |val_1       |
+------------+------------+------------+------------+------------+

CHILD_B_DATA_FRAME:

+------------+------------+------------+------------+------------+
|key_col_0   |key_col_1   |key_col_2   |key_col_3   |val_0       |
+------------+------------+------------+------------+------------+
|key000000   |key000001   |key000002   |key000003   |val_0       |
|key000020   |key000021   |key000022   |key000023   |val_2       |
+------------+------------+------------+------------+------------+

预期结果:

+------------+------------+------------+------------+------------+----------------------------------------------------------+----------------------------------------------------------+
|key_col_0   |key_col_1   |key_col_2   |key_col_3   |val_0       |A_CHILD                                                   |B_CHILD                                                   |
+------------+------------+------------+------------+------------+----------------------------------------------------------+----------------------------------------------------------+
|key000000   |key000001   |key000002   |key000003   |val_0       |array([key000000,key000001,key000002,key000003,val_0])    |array([key000000,key000001,key000002,key000003,val_0])    |
|key000010   |key000011   |key000012   |key000013   |val_1       |array([|key000010,key000011,key000012,key000013,val_1])   |array()                                                   |
|key000020   |key000021   |key000022   |key000023   |val_2       |array()                                                   |array([|key000020,key000021,key000022,key000023,val_2])   |
|key000030   |key000031   |key000032   |key000033   |val_3       |array()                                                   |array()                                                   |
|key000040   |key000041   |key000042   |key000043   |val_4       |array()                                                   |array()                                                   |
+------------+------------+------------+------------+------------+----------------------------------------------------------+----------------------------------------------------------+

假设上面的示例 EXPECTED_RESULT,我想将 PARENT、A_CHILD 和 B_CHILD 的三个数据帧连接到一个数据帧。我找到了一个解决方案,但速度很慢:

val parentDF = ...
val childADF = ...
val childBDF = ...

val aggregatedAColName = "CHILD_A"
val aggregatedBColName = "CHILD_B"

val columns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3", "val_0")
val keyColumns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3")

val nestedAColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedAColName)
val childADataFrame = childADF
  .select(nestedAColumns: _*)
  .repartition(keyColumns.map(col): _*)
  .groupBy(keyColumns.map(col): _*)
  .agg(collect_list(aggregatedAColName).alias(aggregatedAColName))
val joinedWithA = parentDF.join(childADataFrame, keyColumns, "left")

val nestedBColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedBColName)
val childBDataFrame = childBDF
  .select(nestedBColumns: _*)
  .repartition(keyColumns.map(col): _*)
  .groupBy(keyColumns.map(col): _*)
  .agg(collect_list(aggregatedBColName).alias(aggregatedBColName))
val joinedWithB = joinedWithA.join(childBDataFrame, keyColumns, "left")

我怎样才能更快地做到这一点?

标签: scalaapache-sparkapache-spark-sql

解决方案


我们可以将这些 Dataframe 转换为 rdd,然后再转换为 Pair RDD。然后我们可以使用 leftOuterJoin 两次。我们将有以下类型的值。

((key000000,key000001,key000002,key000003,val_0),(1,Some(1),Some(1)))
((key000010,key000011,key000012,key000013,val_1),(1,Some(1),None))

等等...然后您可以将它们映射到所需的表格。希望这可以帮助。


推荐阅读