scala - 按最大分数在数组中选择项目
问题描述
鉴于以下DataFrame
包含一个id
和一个 Seq Stuff
(带有一个 id 和分数),我如何Stuff
按分数选择数组中的“最佳”?
我不想使用 UDF,可能只使用 Spark DataFrame 函数。
case class Stuff(id: Int, score: Double)
val df = spark.createDataFrame(Seq(
(1, Seq(Stuff(11, 0.4), Stuff(12, 0.5))),
(2, Seq(Stuff(22, 0.9), Stuff(23, 0.8)))
)).toDF("id", "data")
df.show(false)
+---+----------------------+
|id |data |
+---+----------------------+
|1 |[[11, 0.4], [12, 0.5]]|
|2 |[[22, 0.9], [23, 0.8]]|
+---+----------------------+
df.printSchema
root
|-- id: integer (nullable = false)
|-- data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: integer (nullable = false)
| | |-- score: double (nullable = false)
我试着沿着窗口函数的路线走,但代码有点太复杂了。预期输出:
+---+---------+
|id |topStuff |
+---+---------
|1 |[12, 0.5]|
|2 |[22, 0.9]|
+---+---------+
解决方案
您可以使用 Spark 2.4 高阶函数:
df
.selectExpr("id","(filter(data, x -> x.score == array_max(data.score)))[0] as topstuff")
.show()
给
+---+---------+
| id| topstuff|
+---+---------+
| 1|[12, 0.5]|
| 2|[22, 0.9]|
+---+---------+
作为替代方案,使用窗口函数(需要改组!):
df
.select($"id",explode($"data").as("topstuff"))
.withColumn("selector",max($"topstuff.score") .over(Window.partitionBy($"id")))
.where($"topstuff.score"===$"selector")
.drop($"selector")
.show()
还给出:
+---+---------+
| id| topstuff|
+---+---------+
| 1|[12, 0.5]|
| 2|[22, 0.9]|
+---+---------+
推荐阅读
- r - 对数据进行分类和分组
- reactjs - React Axios 发布 Oauth2 用户名和密码在 FastAPI 后端失败
- c# - 与添加到单个 BlockingCollection 相比,BlockingCollection.AddToAny 方法是否提供任何性能优势?
- c - 尝试创建两个使用计数器打印其进程 ID 的子进程和一个循环以递增计数器
- python - 如何从另一台机器在树莓派上远程运行 python 脚本?
- java - Kafka Consumer:“散列到队列中”是什么意思?
- webpack - leveldown : No native build was found for platform=linux arch=x64 runtime=electron abi=82 uv=1 libc=glibc node=12.16.3 electron=10.2.0 webpack=true
- node.js - RSS-Parse 从项目中获取属性
- sql - postgres: does where clause filters applied in order they return
- python-3.x - Slow numpy and pandas imports on Google Cloud Run