python - pyspark:如何在火花数据框中对 N 条记录进行分组
问题描述
我有一个包含 500 万条记录的 CSV,其结构如下:
+----------+------------+------------+
| row_id | col1 | col2 |
+----------+------------+------------+
| 1| value | value |
| 2| value | value |
|.... |
|... |
| 5000000| value | value |
+----------+------------+------------+
我需要将此 CSV 转换为 JSON,每个 json 文件都有 500 条记录和如下特定结构:
{
"entry": [
{
"row_id": "1",
"col1": "value",
"col2": "value"
},
{
"row_id": "2",
"col1": "value",
"col2": "value"
},
....
..
{
"row_id": "500",
"col1": "value",
"col2": "value"
}
],
"last_updated":"09-09-2021T01:03:04.44Z"
}
使用PySpark我能够读取 csv 并创建一个数据框。我不知道如何在"entry": [ <500 records> ],"last_updated":"09-09-2021T01:03:04.44Z"
我可以使用的结构的单个 json 中对 500 条记录进行分组,df.coalesce(1).write.option("maxRecordsPerFile",500)
但这只会给我 500 条记录的集合,而没有任何结构。我想要列表中的那 500 条记录"entry"
并"last_updated"
关注它(我从中获取datetime.now()
)。
解决方案
您可以尝试以下方法:
注意。我使用了以下导入。
from pyspark.sql import functions as F
from pyspark.sql import Window
1 . 我们需要一个可用于将您的数据拆分为 500 个记录批次的列
(推荐)我们可以创建一个伪列来实现这一点row_number
df = df.withColumn("group_num",(F.row_number().over(Window.orderBy("row_id"))-1) % 500 )
否则,如果row_id
开始在1
500 万条记录中持续增加,我们可以使用
df = df.withColumn("group_num",(F.col("row_id")-1) % 500 )
"last_updated":"09-09-2021T01:03:04.44Z"
或者在每批 500 条记录中,该列是唯一的
df = df.withColumn("group_num",F.col("last_updated"))
2 . 我们将通过分组来转换您的数据集group_num
df = (
df.groupBy("group_num")
.agg(
F.collect_list(
F.expr("struct(row_id,col1,col2)")
).alias("entries")
)
.withColumn("last_updated",F.lit(datetime.now())))
.drop("group_num")
)
注意。如果您想包含所有可以使用的列,F.expr("struct(*)")
而不是F.expr("struct(row_id,col1,col2)")
.
3 . 最后,您可以使用该选项写入您的输出/目的地,.option("maxRecordsPerFile",1)
因为现在每行最多存储 500 个条目
例如。
df.write.format("json").option("maxRecordsPerFile",1).save("<your intended path here>")
让我知道这是否适合您
推荐阅读
- go - golang.org 上的搜索功能是什么?
- django - 没有产品与给定的查询匹配 Django
- hibernate - Hibernate 返回最后一次获取和修改的对象,而不是获取新的
- python - 如何在 Visual Studio Code 上正确设置 python?我的调试工作不正常
- django - django中的重复对象,也重复子对象(相关对象)
- c++ - 对 unique_ptrs 列表进行排序
- java - fork/join,输入数组需要同步吗?
- python - 如何在python中旋转数据框
- html - 带有“display:flex”的引导程序“.row”类导致“text-align:center”不适用于“p”标签
- android - Android Studio 标准列表视图:E/AndroidRuntime: FATAL EXCEPTION: AsyncTask #1