首页 > 解决方案 > Pyspark - 过滤,分组,聚合列和函数的不同组合

问题描述

我有一个简单的操作要做,Pyspark但我需要使用许多不同的参数来运行该操作。它只是在一个列上过滤,然后按不同的列分组,然后在第三列上聚合。在Python中,函数为:

def filter_gby_reduce(df, filter_col = None, filter_value = None):
  return df.filter(col(filter_col) == filter_value).groupby('ID').agg(max('Value'))

假设不同的配置是

func_params = spark.createDataFrame([('Day', 'Monday'), ('Month', 'January')], ['feature', 'filter_value'])

我当然可以一个一个地运行这些函数:

filter_gby_reduce(df, filter_col = 'Day', filter_value = 'Monday')
filter_gby_reduce(df, filter_col = 'Month', filter_value = 'January')

但我实际收集的参数要大得多。最后,我还需要将union所有函数结果合并到一个数据帧中。那么在 spark 中是否有一种方法可以更简洁地编写此代码,并且可以充分利用并行化的优势?

标签: apache-sparkpysparkgroup-by

解决方案


一种方法是使用whenand生成所需的值作为列max,并将这些值传递给agg. 当你想要联合的值时,你必须使用stack(没有 DataFrame API,所以selectExpr使用 a )取消透视结果。根据您的数据集,null如果过滤器排除所有数据,您可能会得到,如果需要,可以删除这些数据。

我建议测试这个与简单地合并大量过滤数据帧的“天真”方法。

import pyspark.sql.functions as f
func_params = [('Day', 'Monday'), ('Month', 'January')]

df = spark.createDataFrame([
    ('Monday', 'June', 1, 5), 
    ('Monday', 'January', 1, 2), 
    ('Monday', 'June', 1, 5),
    ('Monday', 'June', 2, 10)], 
    ['Day', 'Month', 'ID', 'Value'])


cols = []
for column, flt in func_params:
    name = f'{column}_{flt}'
    val = f.when(f.col(column) == flt, f.col('Value')).otherwise(None)
    cols.append(f.max(val).alias(name))

stack = f"stack({len(cols)}," + ','.join(f"'{column}_{flt}', {column}_{flt}" for column, flt in func_params) + ')'

(df
    .groupby('ID')
    .agg(*cols)
    .selectExpr('ID', stack)
    .withColumnRenamed('col0', 'param')
    .withColumnRenamed('col1', 'Value')
    .show()
)

+---+-------------+-----+                                                       
| ID|        param|Value|
+---+-------------+-----+
|  1|   Day_Monday|    5|
|  1|Month_January|    2|
|  2|   Day_Monday|   10|
|  2|Month_January| null|
+---+-------------+-----+

推荐阅读