首页 > 解决方案 > 如何连接两个数据集在 Spark Scala 中,当一个键按顺序排列时没有爆炸

问题描述

问题陈述

我有三个案例类:

case class Trip (val board: String, val off: String)
case class Description(val key: Int, val airline: String, val trips : Seq[Trip])

case class TripDuration (val airline: String, val board: String, val off: String, val duration: Int)

和两个数据集:

Dataset<Description> DescriptionDS
Dataset<TripDuration> TripDurationDS

我想要的是获得一个数据集类型

case class TripWithDuration (val board: String, val off: String, duration: Int)
case class DescriptionExtended(val key, val airline, val tripsExtended: Seq[TripWithDuration ])

Dataset<TripDurationExtended> TripDurationExtendedDS

所以,加入为了添加Duration。全部使用数据集 API。

可能的解决方案

我知道这可以通过混合方法来完成,在Seq列上进行第一次分解,然后在 Key 上加入 + group by。

如果列数很大,这可能不是最优的,因为在 group by 中,您必须为每个列指定分组函数。

数据名方法

DescriptionDS.select($"key", $"airline", explode( $"trips").as("trip") )
.join(TripDurationDS, Seq($"airline", $"trip.board", $"trip.off"))
.select(<existing cols>, struct(<with board, off, and duration>).as("tripExtended"))
.groupBy($"key").agg(first($"airline"), ..., collect_list($"tripExtended")) // FOr all the columns
.as[TripDurationExtended] // Or somehing similar

数据集方法

DescriptionDS
.joinWith(TripDurationDS, DescriptionDS("airline") === TripDurationDS("airline")
       and <???> )

问题

* 用这种方法可以做到吗?*

* 什么是 - 如果存在 - 使用类型化方法执行此操作的 api?*

标签: scalaapache-sparkjoinapache-spark-datasetseq

解决方案


推荐阅读