首页 > 解决方案 > Spark SQL从组中的第一行中选择所有列

问题描述

我有以下数据框,但我无法弄清楚如何提取组的第一行的所有列。

+--------------------+------------+--------+
|           timestamp|nanos       |file_idx|
+--------------------+------------+--------+
|2018-09-07 05:00:...|    64044267|      1 |
|2018-09-07 05:00:...|    64044267|      2 |
|2018-09-07 05:00:...|    58789223|      3 |
+--------------------+------------+--------+

如何在相同的时间戳和纳秒内提取具有最大 file_idx 的行?我试过使用 groupBy 函数,但它只返回我的 group by 子句中的那些列,实际上这个表包含 160 列。

上述示例中的预期结果是

+--------------------+------------+--------+
|           timestamp|nanos       |file_idx|
+--------------------+------------+--------+
|2018-09-07 05:00:...|    64044267|      2 |
|2018-09-07 05:00:...|    58789223|      3 |
+--------------------+------------+--------+

标签: apache-sparkapache-spark-sql

解决方案


使用row_number()带有partitionBy("timestamp","nanos") 和orderby("file_idx") 降序的窗口函数最终仅过滤highest file_idx窗口中的行。

Example:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

//sample data
df.show()

//+----------------+--------+--------+
//|       timestamp|   nanos|file_idx|
//+----------------+--------+--------+
//|2018-09-07 05:00|64044267|       1|
//|2018-09-07 05:00|64044267|       2|
//|2018-09-07 05:00|58789223|       3|
//+----------------+--------+--------+

val windowSpec = Window.partitionBy("timestamp","nanos").orderBy(desc("file_idx"))

df.withColumn("new_idx",row_number().over(windowSpec)).
filter(col("new_idx") ===1).
drop("new_idx").
show()

//+----------------+--------+--------+
//|       timestamp|   nanos|file_idx|
//+----------------+--------+--------+
//|2018-09-07 05:00|64044267|       2|
//|2018-09-07 05:00|58789223|       3|
//+----------------+--------+--------+

推荐阅读