首页 > 解决方案 > Spark SQL 中分组依据和窗口函数如何交互?

问题描述

这个问题中,我了解到窗口函数是在 PostgresSQL 中按函数分组之后评估的。

我想知道当您在 Spark 的同一查询中使用 group by 和 window 函数时会发生什么。我和上一个问题的发帖人有同样的问题:

标签: apache-sparkapache-spark-sqlpyspark-sql

解决方案


如果您在同一查询中有窗口分组依据,那么

  • Group by 执行first然后window函数将应用于 groupby 数据集。

  • 您可以查看查询解释计划以获取更多详细信息。

Example:

//sample data
spark.sql("select * from tmp").show()
//+-------+----+
//|trip_id|name|
//+-------+----+
//|      1|   a|
//|      2|   b|
//+-------+----+


spark.sql("select row_number() over(order by trip_id),trip_id,count(*) cnt from tmp group by trip_id").explain()
//== Physical Plan ==
//*(4) Project [row_number() OVER (ORDER BY trip_id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#150, trip_id#10, cnt#140L]
//+- Window [row_number() windowspecdefinition(trip_id#10 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (ORDER BY //trip_id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#150], [trip_id#10 ASC NULLS FIRST]
//   +- *(3) Sort [trip_id#10 ASC NULLS FIRST], false, 0
//      +- Exchange SinglePartition
//         +- *(2) HashAggregate(keys=[trip_id#10], functions=[count(1)])
//            +- Exchange hashpartitioning(trip_id#10, 200)
//               +- *(1) HashAggregate(keys=[trip_id#10], functions=[partial_count(1)])
//                  +- LocalTableScan [trip_id#10]

*(2) groupby executed first

*(4) window function applied on the result of grouped dataset.

如果您有窗口子句subquery并且外部查询有分组,则首先执行子查询(窗口),然后执行外部查询(分组)。

Ex: spark.sql("select trip_id,count(*) from(select *,row_number() over(order by trip_id)rn from tmp)e group by trip_id ").explain()


推荐阅读