scala - Spark SQL 连接三个数据帧的快速方法
问题描述
PARENT_DATA_FRAME:
+------------+------------+------------+------------+------------+
|key_col_0 |key_col_1 |key_col_2 |key_col_3 |val_0 |
+------------+------------+------------+------------+------------+
|key000000 |key000001 |key000002 |key000003 |val_0 |
|key000010 |key000011 |key000012 |key000013 |val_1 |
|key000020 |key000021 |key000022 |key000023 |val_2 |
|key000030 |key000031 |key000032 |key000033 |val_3 |
|key000040 |key000041 |key000042 |key000043 |val_4 |
+------------+------------+------------+------------+------------+
CHILD_A_DATA_FRAME:
+------------+------------+------------+------------+------------+
|key_col_0 |key_col_1 |key_col_2 |key_col_3 |val_0 |
+------------+------------+------------+------------+------------+
|key000000 |key000001 |key000002 |key000003 |val_0 |
|key000010 |key000011 |key000012 |key000013 |val_1 |
+------------+------------+------------+------------+------------+
CHILD_B_DATA_FRAME:
+------------+------------+------------+------------+------------+
|key_col_0 |key_col_1 |key_col_2 |key_col_3 |val_0 |
+------------+------------+------------+------------+------------+
|key000000 |key000001 |key000002 |key000003 |val_0 |
|key000020 |key000021 |key000022 |key000023 |val_2 |
+------------+------------+------------+------------+------------+
预期结果:
+------------+------------+------------+------------+------------+----------------------------------------------------------+----------------------------------------------------------+
|key_col_0 |key_col_1 |key_col_2 |key_col_3 |val_0 |A_CHILD |B_CHILD |
+------------+------------+------------+------------+------------+----------------------------------------------------------+----------------------------------------------------------+
|key000000 |key000001 |key000002 |key000003 |val_0 |array([key000000,key000001,key000002,key000003,val_0]) |array([key000000,key000001,key000002,key000003,val_0]) |
|key000010 |key000011 |key000012 |key000013 |val_1 |array([|key000010,key000011,key000012,key000013,val_1]) |array() |
|key000020 |key000021 |key000022 |key000023 |val_2 |array() |array([|key000020,key000021,key000022,key000023,val_2]) |
|key000030 |key000031 |key000032 |key000033 |val_3 |array() |array() |
|key000040 |key000041 |key000042 |key000043 |val_4 |array() |array() |
+------------+------------+------------+------------+------------+----------------------------------------------------------+----------------------------------------------------------+
假设上面的示例 EXPECTED_RESULT,我想将 PARENT、A_CHILD 和 B_CHILD 的三个数据帧连接到一个数据帧。我找到了一个解决方案,但速度很慢:
val parentDF = ...
val childADF = ...
val childBDF = ...
val aggregatedAColName = "CHILD_A"
val aggregatedBColName = "CHILD_B"
val columns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3", "val_0")
val keyColumns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3")
val nestedAColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedAColName)
val childADataFrame = childADF
.select(nestedAColumns: _*)
.repartition(keyColumns.map(col): _*)
.groupBy(keyColumns.map(col): _*)
.agg(collect_list(aggregatedAColName).alias(aggregatedAColName))
val joinedWithA = parentDF.join(childADataFrame, keyColumns, "left")
val nestedBColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedBColName)
val childBDataFrame = childBDF
.select(nestedBColumns: _*)
.repartition(keyColumns.map(col): _*)
.groupBy(keyColumns.map(col): _*)
.agg(collect_list(aggregatedBColName).alias(aggregatedBColName))
val joinedWithB = joinedWithA.join(childBDataFrame, keyColumns, "left")
我怎样才能更快地做到这一点?
解决方案
我们可以将这些 Dataframe 转换为 rdd,然后再转换为 Pair RDD。然后我们可以使用 leftOuterJoin 两次。我们将有以下类型的值。
((key000000,key000001,key000002,key000003,val_0),(1,Some(1),Some(1)))
((key000010,key000011,key000012,key000013,val_1),(1,Some(1),None))
等等...然后您可以将它们映射到所需的表格。希望这可以帮助。
推荐阅读
- linux - 从所有目录检查公共子目录中的文件计数
- github - Appveyor、GitHub、repo 中的多个项目
- c# - 中兴PDF417识别
- security - 如何在不访问互联网的情况下在沙箱中执行 Python 或 Go 脚本?
- c++ - 简单问题 - 如何从单独的“结构”中的“类”访问变量?
- javascript - 如何为 TypeORM 实体中的 @PrimaryGeneratedColumn 设置自定义默认唯一 ID 字符串?
- android - SecondActivity.class 文件未与以红色显示的意图对象链接
- c++ - 如何检查cpp中给定输入的数据类型?
- c# - 无法访问公共接口的属性
- docker - 访问 docker 容器内的其他 docker 服务时超时