首页 > 解决方案 > 使用 PySpark 在组之后将行收集为 Spark 数据帧的数组

问题描述

我有以下df:

url | source | value | name
----------------------------------
a   | USA    | 1     | registry_a
a   | USA    | 1     | registry_b
a   | FRA    | 1     | registry_a
b   | DEU    | 2     | null
b   | DEU    | 1     | registry_b
b   | FRA    | 1     | registry_a
c   | ITA    | 1     | registry_a
c   | ITA    | 0     | registry_b

并且我希望能够按url和分组source并创建一个新列,其中包含一个名为 的数组列中该组的所有行data

url | source | data 
----------------------------------------------------------------------------------
a   | USA    | [{"url": "a", "source": "USA", "value": 1, "name": "registry_a"},
    |        |  {"url": "a", "source": "USA", "value": 1, "name": "registry_b"},
    |        |  {"url": "a", "source": "FRA", "value": 1, "name": "registry_a"}] 
a   | FRA    | [{"url": "a", "source": "FRA", "value": 1, "name": "registry_a"}] 
b   | DEU    | [{"url": "b", "source": "DEU", "value": 2, "name": null},
    |        |  {"url": "b", "source": "DEU", "value": 1, "name": "registry_a"}] 
b   | FRA    | [{"url": "b", "source": "FRA", "value": 1, "name": "registry_a"}] 
c   | ITA    | [{"url": "c", "source": "ITA", "value": 1, "name": "registry_a"},
    |        |  {"url": "c", "source": "ITA", "value": 0, "name": "registry_b"}] 

我试过这个但它不起作用:

samples_to_map_df = (samples
                     .groupBy("url", "source")
                     .agg(F.map_from_arrays(F.collect_list(F.col("url")), 
                                            F.collect_list(F.col("source")),
                                            F.collect_list(F.col("value")),
                                            F.collect_list(F.col("value"))) 
                                           .alias("data"))
)

我收到此错误:

TypeError: map_from_arrays() 接受 2 个位置参数,但给出了 4 个

标签: apache-sparkpysparkapache-spark-sql

解决方案


您需要收集结构列表,然后使用to_json函数来获得所需的输出:

import pyspark.sql.functions as F

samples_to_map_df = samples.groupBy("url", "source").agg(
    F.to_json(
        F.collect_list(
            F.struct(*[F.col(c).alias(c) for c in samples.columns])
        )
    ).alias("data")
)

samples_to_map_df.show(truncate=False)

#+---+------+-----------------------------------------------------------------------------------------------------------------------+
#|url|source|data                                                                                                                   |
#+---+------+-----------------------------------------------------------------------------------------------------------------------+
#|a  |USA   |[{"url":"a","source":"USA","value":"1","name":"registry_a"},{"url":"a","source":"USA","value":"1","name":"registry_b"}]|
#|c  |ITA   |[{"url":"c","source":"ITA","value":"1","name":"registry_a"},{"url":"c","source":"ITA","value":"0","name":"registry_b"}]|
#|b  |DEU   |[{"url":"b","source":"DEU","value":"2"},{"url":"b","source":"DEU","value":"1","name":"registry_b"}]                    |
#|a  |FRA   |[{"url":"a","source":"FRA","value":"1","name":"registry_a"}]                                                           |
#|b  |FRA   |[{"url":"b","source":"FRA","value":"1","name":"registry_a"}]                                                           |
#+---+------+-----------------------------------------------------------------------------------------------------------------------+

推荐阅读