首页 > 解决方案 > 按最大分数在数组中选择项目

问题描述

鉴于以下DataFrame包含一个id和一个 Seq Stuff(带有一个 id 和分数),我如何Stuff按分数选择数组中的“最佳”?

我不想使用 UDF,可能只使用 Spark DataFrame 函数

case class Stuff(id: Int, score: Double)

val df = spark.createDataFrame(Seq(
    (1, Seq(Stuff(11, 0.4), Stuff(12, 0.5))), 
    (2, Seq(Stuff(22, 0.9), Stuff(23, 0.8)))
)).toDF("id", "data")

df.show(false)
+---+----------------------+
|id |data                  |
+---+----------------------+
|1  |[[11, 0.4], [12, 0.5]]|
|2  |[[22, 0.9], [23, 0.8]]|
+---+----------------------+

df.printSchema
root
 |-- id: integer (nullable = false)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = false)
 |    |    |-- score: double (nullable = false)

我试着沿着窗口函数的路线走,但代码有点太复杂了。预期输出:

+---+---------+
|id |topStuff |
+---+---------
|1  |[12, 0.5]|
|2  |[22, 0.9]|
+---+---------+

标签: scalaapache-sparkapache-spark-sql

解决方案


您可以使用 Spark 2.4 高阶函数:

df
  .selectExpr("id","(filter(data, x -> x.score  == array_max(data.score)))[0] as topstuff")
  .show()

+---+---------+
| id| topstuff|
+---+---------+
|  1|[12, 0.5]|
|  2|[22, 0.9]|
+---+---------+

作为替代方案,使用窗口函数(需要改组!):

df
  .select($"id",explode($"data").as("topstuff"))
  .withColumn("selector",max($"topstuff.score") .over(Window.partitionBy($"id")))
  .where($"topstuff.score"===$"selector")
  .drop($"selector")
  .show()

还给出:

+---+---------+
| id| topstuff|
+---+---------+
|  1|[12, 0.5]|
|  2|[22, 0.9]|
+---+---------+

推荐阅读