scala - 如何连接两个数据集在 Spark Scala 中,当一个键按顺序排列时没有爆炸
问题描述
问题陈述
我有三个案例类:
case class Trip (val board: String, val off: String)
case class Description(val key: Int, val airline: String, val trips : Seq[Trip])
case class TripDuration (val airline: String, val board: String, val off: String, val duration: Int)
和两个数据集:
Dataset<Description> DescriptionDS
Dataset<TripDuration> TripDurationDS
我想要的是获得一个数据集类型
case class TripWithDuration (val board: String, val off: String, duration: Int)
case class DescriptionExtended(val key, val airline, val tripsExtended: Seq[TripWithDuration ])
Dataset<TripDurationExtended> TripDurationExtendedDS
所以,加入为了添加Duration。全部使用数据集 API。
可能的解决方案
我知道这可以通过混合方法来完成,在Seq列上进行第一次分解,然后在 Key 上加入 + group by。
如果列数很大,这可能不是最优的,因为在 group by 中,您必须为每个列指定分组函数。
数据名方法
DescriptionDS.select($"key", $"airline", explode( $"trips").as("trip") )
.join(TripDurationDS, Seq($"airline", $"trip.board", $"trip.off"))
.select(<existing cols>, struct(<with board, off, and duration>).as("tripExtended"))
.groupBy($"key").agg(first($"airline"), ..., collect_list($"tripExtended")) // FOr all the columns
.as[TripDurationExtended] // Or somehing similar
数据集方法
DescriptionDS
.joinWith(TripDurationDS, DescriptionDS("airline") === TripDurationDS("airline")
and <???> )
问题
* 用这种方法可以做到吗?*
* 什么是 - 如果存在 - 使用类型化方法执行此操作的 api?*
解决方案
推荐阅读
- android - 按下主页按钮时如何停止服务?
- java - Java:如何在变量中“存储”getter?
- node.js - 用户登录在 Angular 中无法正常工作?
- python - 基于参数的模拟 sqlalchemy 查询。例如:db.session.query(Model.id)
- r - 如何将打印值放入矩阵
- android - 如何在 Hilt 中生成相同类型的对象?
- reactjs - React – 如何有条件地返回一个值(或回退)
- java - AndroidOS中imageview.getImageMatrix()的HarmonyOS替代方法是什么
- reactjs - 通过 React 中自定义事件的详细信息传输的输出数据
- java - 我们可以在数据库中的模式内有模式吗