首页 > 解决方案 > PySpark - 获取数据框中动态列的聚合值

问题描述

我有一个包含以下行的数据框:

+------+--------+-------+-------+
| label| machine| value1| value2|
+------+--------+-------+-------+
|label1|machine1|     13|    7.5|
|label1|machine1|     9 |    7.5|
|label1|machine1|    8.5|    7.5|
|label1|machine1|   10.5|    7.5|
|label1|machine1|     12|      8|
|label1|machine2|     8 |   13.5|
|label1|machine2|     18|     10|
|label1|machine2|     10|     14|
|label1|machine2|     9 |   10.5|
|label1|machine2|    8.5|     10|
|label2|machine3|     8 |    7.5|
|label2|machine3|     18|    7.5|
|label2|machine3|     10|    7.5|
|label2|machine3|     9 |    7.5|
|label2|machine3|    8.5|      8|
|label2|machine4|   13.5|     13|
|label2|machine4|     10|      9|
|label2|machine4|     14|    8.5|
|label2|machine4|   10.5|   10.5|
|label2|machine4|     10|     12|
+------+--------+-------+-------+

value1, value2在这里,我可以在数据框中有多个值列。对于每一列,我想聚合这些值collect_list并在数据框中创建一个新列,以便稍后执行一些功能。

为此,我尝试这样:

my_df = my_df.groupBy(['label', 'machine']). \
     agg(collect_list("value1").alias("col_value1"), collect_list("value2").alias("col_value2"))

label当我按列分组时,它给了我以下 4 行machine

+------+--------+--------------------+--------------------+
| label| machine|    collected_value1|    collected_value2|
+------+--------+--------------------+--------------------+
|label1|machine1|[13.0, 9.0, 8.5, ...|[7.5, 7.5, 7.5, 7...|
|label2|machine2|[8.0, 18.0, 10.0,...|[13.5, 10.0, 14, ...|
|label1|machine3|[8.0, 18.0, 10.0,...|[7.5, 7.5, 7.5, 7...|
|label2|machine4|[13.5, 10.0, 14, ...|[13.0, 9.0, 8.5, ...|
+------+--------+--------------------+--------------------+

现在,我的问题是如何将列动态传递给该组。每次运行的列可能会有所不同,所以我想使用这样的东西:

df_cols = ['value1', 'value2']

my_df = my_df.groupBy(['label', 'machine']). \
    agg(collect_list(col_name).alias(str(col_name+"_collected")) for col_name in df_cols)

它给了我AssertionError: all exprs should be Column错误。

我怎样才能做到这一点?有人可以帮我吗?

提前致谢。

标签: pysparkpyspark-dataframes

解决方案


下面的代码已经奏效。谢谢你。

exprs = [collect_list(x).alias(str(x+"_collected")) for x in df_cols]
my_df = my_df.groupBy(['label', 'machine']).agg(*exprs)

推荐阅读