首页 > 解决方案 > dataframe spark scala take the (MAX-MIN) for each group

问题描述

i have a dataframe from a processing part, looks like :

   +---------+------+-----------+
|Time     |group |value      |
+---------+------+-----------+
|    28371|    94|        906|
|    28372|    94|        864|
|    28373|    94|        682|
|    28374|    94|        574|
|    28383|    95|        630|
|    28384|    95|        716|
|    28385|    95|        913|

i would like to take the (value for max time - value for min time) for each group, to have this result :

+------+-----------+
|group |  value    |
+------+-----------+
|    94|       -332|
|    95|        283|

Thank you in advance for the help

标签: scalaapache-sparkapache-spark-sql

解决方案


df.groupBy("groupCol").agg(max("value")-min("value"))

Based on the question edit by the OP, here is a way to do this in PySpark. The idea is to compute the row numbers in ascending and descending order of time per group and use those values for subtraction.

from pyspark.sql import Window
from pyspark.sql import functions as func
w_asc = Window.partitionBy(df.groupCol).orderBy(df.time)
w_desc = Window.partitionBy(df.groupCol).orderBy(func.desc(df.time))
df = df.withColumn(func.row_number().over(w_asc).alias('rnum_asc')) \
       .withColumn(func.row_number().over(w_desc).alias('rnum_desc'))
df.groupBy(df.groupCol) \
  .agg((func.max(func.when(df.rnum_desc==1,df.value))-func.max(func.when(df.rnum_asc==1,df.value))).alias('diff')).show()

It would have been easier if window function first_value were available in Spark SQL. A generic way to solve this using SQL is

select distinct groupCol,diff
from (
select t.*
      ,first_value(val) over(partition by groupCol order by time) - 
       first_value(val) over(partition by groupCol order by time desc) as diff
from tbl t
) t 

推荐阅读