apache-spark - 在 pyspark 中处理具有多种记录类型的单个文件
问题描述
我的数据看起来有点像以下(data.txt
):
01,"Alice","The Cat"
02,Yes
03,2000,01,01
01,"Bob","The Dog"
02,No
03,2001,01,04
每行的前两个字符给出 a record_type
,然后确定该行的模式。数据是不可预测的——每个“块”可以有多个或没有特定记录类型的实例。该文件是一个.csv
符合条件的文件"
,实际上有数百万条记录。
我希望将这种文件类型使用pyspark
, 分成多个文件 - 每个记录类型一个。我已经尝试过使用rdd
循环方法,但感觉可能有另一种更好的方法可用。目前这需要 30 分钟来生成所有文件。
import csv
record_types = ["01", "02", "03"]
rdd = sc.textFile("data.txt")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
for r in record_types:
records = rdd.filter(lambda x: x[0] == r)
sdf = spark.createDataFrame(records)
sdf.write.mode('overwrite').csv(f"{r}.csv")
解决方案
创建一个 Dataframe,其中 record_type 作为第一列,所有剩余的作为第二列,如下所示,
input_df=spark.createDataFrame(sc.textFile("test.txt").map(lambda x : tuple([x[0:2], x[3:]])))
+-----------+-----------------+
| _1 | _2 |
+-----------+-----------------+
| 01|"Alice","The Cat"|
| 02| Yes|
| 03| 2000,01,01|
| 01| "Bob","The Dog"|
| 02| No|
| 03| 2001,01,04|
+-----------+-----------------+
然后,根据类型过滤行并存储它们。
types=["01","02","03"]
for type in types:
input_df.filter(col("_1") == type).write.mode('overwrite').csv(f"{type}_rows.csv")
推荐阅读
- unity3d - Player Car 运行时缩放问题
- android - Xamarin Android 应用程序在发布模式下启动时崩溃在调试模式下工作正常
- dart - Flutter:如果 String-Variable 从其他方法更改,则更改文本字段
- linux - 使用 exec、shell_exec 和反引号运行 ionic Cordova 命令并返回空路径
- python - 如何使用python执行这个sql查询
- python - PyCUDA在使用多个块处理矩阵运算时,为什么矩阵大小必须能被块大小整除?
- git - 基于 git commit ID 构建 docker 镜像等的 Jenkins CI/CD 流程
- html - 如何使用物化创建面向列的布局?
- php - 每个新月重置倒计时
- python - 在同一行的 Jupyter 上交替使用 Shell 和 Python 命令