scala - 按列“grp”分组并压缩 DataFrame - (按列“ord”排序的每列取最后一个非空值)
问题描述
假设我有以下数据框:
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 3|null| 11|
| 2| null| 2| xxx| 22|
| 1| null| 1| yyy|null|
| 2| null| 7|null| 33|
| 1| null| 12|null|null|
| 2| null| 19|null| 77|
| 1| null| 10| s13|null|
| 2| null| 11| a23|null|
+---+--------+---+----+----+
这是带有注释的相同示例 DF,按grp
和排序ord
:
scala> df.orderBy("grp", "ord").show
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 1| yyy|null|
| 1| null| 3|null| 11| # grp:1 - last value for `col2` (11)
| 1| null| 10| s13|null| # grp:1 - last value for `col1` (s13)
| 1| null| 12|null|null| # grp:1 - last values for `null_col`, `ord`
| 2| null| 2| xxx| 22|
| 2| null| 7|null| 33|
| 2| null| 11| a23|null| # grp:2 - last value for `col1` (a23)
| 2| null| 19|null| 77| # grp:2 - last values for `null_col`, `ord`, `col2`
+---+--------+---+----+----+
我想压缩它。即按列对它进行"grp"
分组,对于每个组,按列对行进行排序"ord"
并取每列中的最后一个not null
值(如果有的话)。
+---+--------+---+----+----+
|grp|null_col|ord|col1|col2|
+---+--------+---+----+----+
| 1| null| 12| s13| 11|
| 2| null| 19| a23| 77|
+---+--------+---+----+----+
我见过以下类似的问题:
但我真正的 DataFrame 有超过 250 列,所以我需要一个不需要明确指定所有列的解决方案。
我无法绕开它...
MCVE:如何创建示例 DataFrame:
- 创建本地文件“/tmp/data.txt”并复制并粘贴DataFrame的上下文(如上面发布的那样)
- 定义功能
readSparkOutput()
: 将“/tmp/data.txt”解析为DataFrame:
val df = readSparkOutput("file:///tmp/data.txt")
更新: 我认为它应该类似于以下 SQL:
SELECT
grp, ord, null_col, col1, col2
FROM (
SELECT
grp,
ord,
FIRST(null_col) OVER (PARTITION BY grp ORDER BY ord DESC) as null_col,
FIRST(col1) OVER (PARTITION BY grp ORDER BY ord DESC) as col1,
FIRST(col2) OVER (PARTITION BY grp ORDER BY ord DESC) as col2,
ROW_NUMBER() OVER (PARTITION BY grp ORDER BY ord DESC) as rn
FROM table_name) as v
WHERE v.rn = 1;
我们如何动态生成这样的 Spark 查询?
我尝试了以下简化方法:
import org.apache.spark.sql.expressions.Window
val win = Window
.partitionBy("grp")
.orderBy($"ord".desc)
val cols = df.columns.map(c => first(c, ignoreNulls=true).over(win).as(c))
产生:
scala> cols
res23: Array[org.apache.spark.sql.Column] = Array(first(grp, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `grp`, first(null_col, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `null_col`, first(ord, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `ord`, first(col1, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col1`, first(col2, true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col2`)
但我无法将其传递给df.select
:
scala> df.select(cols.head, cols.tail: _*).show
<console>:34: error: no `: _*' annotation allowed here
(such annotations are only allowed in arguments to *-parameters)
df.select(cols.head, cols.tail: _*).show
另一种尝试:
scala> df.select(cols.map(col): _*).show
<console>:34: error: type mismatch;
found : String => org.apache.spark.sql.Column
required: org.apache.spark.sql.Column => ?
df.select(cols.map(col): _*).show
解决方案
last(c, ignoreNulls=true)
考虑以下方法,该方法将按“ord”每个“grp”排序的Window 函数应用于每个选定的列;后跟 agroupBy("grp")
以获取first
agg(colFcnMap)结果:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df0 = Seq(
(1, 3, None, Some(11)),
(2, 2, Some("aaa"), Some(22)),
(1, 1, Some("s12"), None),
(2, 7, None, Some(33)),
(1, 12, None, None),
(2, 19, None, Some(77)),
(1, 10, Some("s13"), None),
(2, 11, Some("a23"), None)
).toDF("grp", "ord", "col1", "col2")
val df = df0.withColumn("null_col", lit(null))
df.orderBy("grp", "ord").show
// +---+---+----+----+--------+
// |grp|ord|col1|col2|null_col|
// +---+---+----+----+--------+
// | 1| 1| s12|null| null|
// | 1| 3|null| 11| null|
// | 1| 10| s13|null| null|
// | 1| 12|null|null| null|
// | 2| 2| aaa| 22| null|
// | 2| 7|null| 33| null|
// | 2| 11| a23|null| null|
// | 2| 19|null| 77| null|
// +---+---+----+----+--------+
val win = Window.partitionBy("grp").orderBy("ord").
rowsBetween(0, Window.unboundedFollowing)
val nonAggCols = Array("grp")
val cols = df.columns.diff(nonAggCols) // Columns to be aggregated
val colFcnMap = cols.zip(Array.fill(cols.size)("first")).toMap
// colFcnMap: scala.collection.immutable.Map[String,String] =
// Map(ord -> first, col1 -> first, col2 -> first, null_col -> first)
cols.foldLeft(df)((acc, c) =>
acc.withColumn(c, last(c, ignoreNulls=true).over(win))
).
groupBy("grp").agg(colFcnMap).
select(col("grp") :: colFcnMap.toList.map{case (c, f) => col(s"$f($c)").as(c)}: _*).
show
// +---+---+----+----+--------+
// |grp|ord|col1|col2|null_col|
// +---+---+----+----+--------+
// | 1| 12| s13| 11| null|
// | 2| 19| a23| 77| null|
// +---+---+----+----+--------+
请注意,final用于从聚合列名select
中剥离函数名(在本例中为)。first()
推荐阅读
- angular - Angular 路由保持显示前一个组件并且不会完全加载下一个
- spring-boot - Spring Boot Acutator:PrometheusPushGatewayManager.push 很容易放弃,即使是 pushgateway 端点的临时中断
- c# - c# interop Excel - 禁用或删除自动分页符
- excel - 将带有文件名的列添加到多个 Excel 文件
- scala - 如何在 Scala 中创建时间戳序列
- r - 仅将日期(月份)转换为 R 中的 XTS 类
- javascript - 使用道具时更改 TextInput 中的值的问题
- python-3.x - 在类 Kivy 文件到 Python 文件之间使用类变量
- c# - 将特定命名空间添加到 appsettings.json - Serilog ASP.NET Core 3.1
- javascript - 如何从侧边栏链接导航到同一页面上的新 bootstap 4 选项卡?