首页 > 解决方案 > 在 spark sql 的第三列中找到具有最大值的两列组合。使用此列组合查找历史最小值和最大值

问题描述

我有一张带有下表的蜂巢表。我必须找到 Instance 和 name 的正确组合,并且对于最新的循环码具有最大值。Hive 表也有多个 Instance。对于每个实例,我都找到了正确的实例,名称具有最新循环码的最大值

 Instance   name     value    cyclecode
 A37        ratio.1  10       1
 A37        ratio.2  20       1
 A37        ratio.3  90       1
 A37        ratio.1  10       2
 A37        ratio.2  20       2
 A37        ratio.3  30       2
 A37        ratio.1  10       3
 A37        ratio.2  12       3
 A37        ratio.3  80       3

预期输出:

Instance   name     value    cyclecode
 A37        ratio.3  80       3

使用这种组合,我必须使用以前的循环码数据找到历史最大值和最小值

预期输出:

Instance   name     Historical_min     Historical_max
 A37        ratio.3  30                   90

我在 Spark-sql 下尝试过。但我没有得到预期的输出:

spark.sql("WITH pick_val_max (select MAX(value) as val_max from table WHERE cycle_code = 3 limit 1) SELECT instance, name, value from table, pick_val_max WHERE name RLIKE 'Histogram.ratio' AND cycle_code = 3 and value = pick_val_max.val_max").show(truncate=false)

标签: apache-sparkapache-spark-sql

解决方案


from pyspark.sql import Window, functions as F

df_origin = spark.createDataFrame(
[
{ "Instance":"A37", "name" : "ratio.1","value": 10, "cyclecode" : 1},
{ "Instance":"A37", "name" : "ratio.2","value": 20, "cyclecode" : 1},
{ "Instance":"A37", "name" : "ratio.3","value": 90, "cyclecode" : 1},
{ "Instance":"A37", "name" : "ratio.1","value": 10, "cyclecode" : 2},
{ "Instance":"A37", "name" : "ratio.2","value": 20, "cyclecode" : 2},
{ "Instance":"A37", "name" : "ratio.3","value": 30, "cyclecode" : 2},
{ "Instance":"A37", "name" : "ratio.1","value": 10, "cyclecode" : 3},
{ "Instance":"A37", "name" : "ratio.2","value": 12, "cyclecode" : 3},
{ "Instance":"A37", "name" : "ratio.3","value": 80, "cyclecode" : 3}
])

w = Window().orderBy(F.col('cyclecode').desc(), F.col('value').desc())
df = df_origin.withColumn('rn', F.row_number().over(w))

df = df.filter('rn = 1').drop('rn')
df = df_origin.join(df, ['Instance', 'name'], 'leftsemi')

res_df = df.groupBy('Instance', 'name').agg(F.max('value').alias('Historical_max'), F.min('value').alias('Historical_min'))

输出:

+--------+-------+--------------+--------------+
|Instance|   name|Historical_max|Historical_min|
+--------+-------+--------------+--------------+
|     A37|ratio.3|            90|            30|
+--------+-------+--------------+--------------+

推荐阅读