apache-spark - Pyspark - 过滤,分组,聚合列和函数的不同组合
问题描述
我有一个简单的操作要做,Pyspark
但我需要使用许多不同的参数来运行该操作。它只是在一个列上过滤,然后按不同的列分组,然后在第三列上聚合。在Python
中,函数为:
def filter_gby_reduce(df, filter_col = None, filter_value = None):
return df.filter(col(filter_col) == filter_value).groupby('ID').agg(max('Value'))
假设不同的配置是
func_params = spark.createDataFrame([('Day', 'Monday'), ('Month', 'January')], ['feature', 'filter_value'])
我当然可以一个一个地运行这些函数:
filter_gby_reduce(df, filter_col = 'Day', filter_value = 'Monday')
filter_gby_reduce(df, filter_col = 'Month', filter_value = 'January')
但我实际收集的参数要大得多。最后,我还需要将union
所有函数结果合并到一个数据帧中。那么在 spark 中是否有一种方法可以更简洁地编写此代码,并且可以充分利用并行化的优势?
解决方案
一种方法是使用when
and生成所需的值作为列max
,并将这些值传递给agg
. 当你想要联合的值时,你必须使用stack
(没有 DataFrame API,所以selectExpr
使用 a )取消透视结果。根据您的数据集,null
如果过滤器排除所有数据,您可能会得到,如果需要,可以删除这些数据。
我建议测试这个与简单地合并大量过滤数据帧的“天真”方法。
import pyspark.sql.functions as f
func_params = [('Day', 'Monday'), ('Month', 'January')]
df = spark.createDataFrame([
('Monday', 'June', 1, 5),
('Monday', 'January', 1, 2),
('Monday', 'June', 1, 5),
('Monday', 'June', 2, 10)],
['Day', 'Month', 'ID', 'Value'])
cols = []
for column, flt in func_params:
name = f'{column}_{flt}'
val = f.when(f.col(column) == flt, f.col('Value')).otherwise(None)
cols.append(f.max(val).alias(name))
stack = f"stack({len(cols)}," + ','.join(f"'{column}_{flt}', {column}_{flt}" for column, flt in func_params) + ')'
(df
.groupby('ID')
.agg(*cols)
.selectExpr('ID', stack)
.withColumnRenamed('col0', 'param')
.withColumnRenamed('col1', 'Value')
.show()
)
+---+-------------+-----+
| ID| param|Value|
+---+-------------+-----+
| 1| Day_Monday| 5|
| 1|Month_January| 2|
| 2| Day_Monday| 10|
| 2|Month_January| null|
+---+-------------+-----+
推荐阅读
- python - How to replace white space in column? Pandas
- python - Pandas 滚动总和,包括前几天和未来几天
- reactjs - 在 React 中使用一个或多个初始 useEffects?
- macos - 微星机械键盘在 MacBook Pro 上输入错误
- python - 使用python获取图像中的最高点(z)
- python-3.x - tf.reduce_mean 在 tf.function 中不起作用
- asp.net-mvc - 如何在多个网站 OpenIdConnect Azure AD B2C 中登录用户?
- flutter - 错误:在此小部件 Flutter [PropertiesGrid] 上方找不到正确的提供程序
- tcp - 当 RMI TCP 连接打开时,Jmeter 测试停止?
- python - 阅读卡片上的文字