scala - Spark数据帧组中的最后一个对象
问题描述
我需要为给定的“id”选择最后一个“名字”。可能的解决方案如下:
val channels = sessions
.select($"start_time", $"id", $"name")
.orderBy($"start_time")
.select($"id", $"name")
.groupBy($"id")
.agg(last("name"))
我不知道它是否正确,因为我不确定orderBy
在做之后是否保留它groupBy
。
但这肯定不是一个高性能的解决方案。可能我应该使用reduceByKey
. 我在 spark shell 中尝试了以下操作,它可以工作
val x = sc.parallelize(Array(("1", "T1"), ("2", "T2"), ("1", "T11"), ("1", "T111"), ("2", "T22"), ("1", "T100"), ("2", "T222"), ("2", "T200")), 3)
x.reduceByKey((acc,x) => x).collect
但它不适用于我的数据框。
case class ChannelRecord(id: Long, name: String)
val channels = sessions
.select($"start_time", $"id", $"name")
.orderBy($"start_time")
.select($"id", $"name")
.as[ChannelRecord]
.reduceByKey((acc, x) => x) // take the last object
我得到一个编译错误:值 reduceByKey 不是 org.apache.spark.sql.Dataset 的成员
我想我应该map()
在做之前添加一个电话,reduceByKey
但我不知道我应该映射什么。
解决方案
例如,您可以使用窗口函数来做到这一点。这将需要对id
列进行洗牌并在start_time
.
有两个阶段:
- 获取每个 id 的姓氏
- 只保留姓氏的行(最大开始时间)
示例数据框:
val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
Seq(
Row(1, "a", 1),
Row(1, "b", 2),
Row(1, "c", 3),
Row(2, "d", 4),
Row(2, "e", 5),
Row(2, "f", 6),
Row(3, "g", 7),
Row(3, "h", 8)
))
val schema: StructType = new StructType()
.add(StructField("id", IntegerType, false))
.add(StructField("name", StringType, false))
.add(StructField("start_time", IntegerType, false))
val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)
定义一个窗口。请注意,我在这里按start_time
降序排序。这是为了能够在下一步中选择第一行。
val w = Window.partitionBy("id").orderBy(col("start_time").desc)
然后
df0.withColumn("last_name", first("name").over(w)) // get first name for each id (first because of decreasing start_time)
.withColumn("row_number", row_number().over(w)) // get row number for each id sorted by start_time
.filter("row_number=1") // choose only first rows (first row = max start_time)
.drop("row_number") // get rid of row_number columns
.sort("id")
.show(10, false)
这返回
+---+----+----------+---------+
|id |name|start_time|last_name|
+---+----+----------+---------+
|1 |c |3 |c |
|2 |f |6 |f |
|3 |h |8 |h |
+---+----+----------+---------+
推荐阅读
- keras - 解释 keras.predict 的奇怪输出
- powershell - Powershell 提取最近 n 分钟的日志文件内容
- javascript - 恢复 Fiori 应用程序状态
- python - Kubectl 获取 pod 中的文件创建时间
- kubernetes - Kubernetes 自动将 storageClassName 添加到 PVC
- android - IntelliJ 中缺少部署目标选项
- r - R中的混合模型设计和语法
- arrays - 是否可以在 SwiftUI/Swift 中对类函数内部的数组进行变异?
- console - 为什么当我尝试使用 geth 在本地计算机上建立私有链时,“正在生成 DAG”和“寻找对等点”一直显示?
- php - 在本地机器上工作但抛出:目标类 [App\Http\Controllers\JobsController] 不存在。在 Heroku 上