首页 > 解决方案 > pyspark:如何在火花数据框中对 N 条记录进行分组

问题描述

我有一个包含 500 万条记录的 CSV,其结构如下:

+----------+------------+------------+
|  row_id  |    col1    |    col2    |
+----------+------------+------------+
|         1|   value    |    value   |
|         2|   value    |    value   |
|....                                |
|...                                 |
|   5000000|   value    |    value   |
+----------+------------+------------+

我需要将此 CSV 转换为 JSON,每个 json 文件都有 500 条记录和如下特定结构:

{
    "entry": [
        {
            "row_id": "1",
            "col1": "value",
            "col2": "value"
        },
        {
            "row_id": "2",
            "col1": "value",
            "col2": "value"
        },
        ....
        ..
        {
            "row_id": "500",
            "col1": "value",
            "col2": "value"
        }
    ],
    "last_updated":"09-09-2021T01:03:04.44Z"
}

使用PySpark我能够读取 csv 并创建一个数据框。我不知道如何在"entry": [ <500 records> ],"last_updated":"09-09-2021T01:03:04.44Z"
我可以使用的结构的单个 json 中对 500 条记录进行分组,df.coalesce(1).write.option("maxRecordsPerFile",500)但这只会给我 500 条记录的集合,而没有任何结构。我想要列表中的那 500 条记录"entry""last_updated"关注它(我从中获取datetime.now())。

标签: pythonpython-3.xapache-sparkpyspark

解决方案


您可以尝试以下方法:


注意。我使用了以下导入。

from pyspark.sql import functions as F
from pyspark.sql import Window

1 . 我们需要一个可用于将您的数据拆分为 500 个记录批次的列

(推荐)我们可以创建一个伪列来实现这一点row_number

df = df.withColumn("group_num",(F.row_number().over(Window.orderBy("row_id"))-1) % 500 )

否则,如果row_id开始在1500 万条记录中持续增加,我们可以使用

df = df.withColumn("group_num",(F.col("row_id")-1) % 500 )

"last_updated":"09-09-2021T01:03:04.44Z"或者在每批 500 条记录中,该列是唯一的

df = df.withColumn("group_num",F.col("last_updated"))

2 . 我们将通过分组来转换您的数据集group_num

df = (
    df.groupBy("group_num")
      .agg(
          F.collect_list(
              F.expr("struct(row_id,col1,col2)")
          ).alias("entries")
      )
      .withColumn("last_updated",F.lit(datetime.now())))
      .drop("group_num")
)

注意。如果您想包含所有可以使用的列,F.expr("struct(*)")而不是F.expr("struct(row_id,col1,col2)").


3 . 最后,您可以使用该选项写入您的输出/目的地,.option("maxRecordsPerFile",1)因为现在每行最多存储 500 个条目

例如。

df.write.format("json").option("maxRecordsPerFile",1).save("<your intended path here>")

让我知道这是否适合您


推荐阅读