首页 > 解决方案 > 在 pyspark 中处理具有多种记录类型的单个文件

问题描述

我的数据看起来有点像以下(data.txt):

01,"Alice","The Cat"
02,Yes
03,2000,01,01
01,"Bob","The Dog"
02,No
03,2001,01,04

每行的前两个字符给出 a record_type,然后确定该行的模式。数据是不可预测的——每个“块”可以有多个或没有特定记录类型的实例。该文件是一个.csv符合条件的文件",实际上有数百万条记录。

我希望将这种文件类型使用pyspark, 分成多个文件 - 每个记录类型一个。我已经尝试过使用rdd循环方法,但感觉可能有另一种更好的方法可用。目前这需要 30 分钟来生成所有文件。

import csv
record_types = ["01", "02", "03"]
rdd = sc.textFile("data.txt")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

for r in record_types:
  records = rdd.filter(lambda x: x[0] == r)
  sdf = spark.createDataFrame(records)
  sdf.write.mode('overwrite').csv(f"{r}.csv")

标签: apache-sparkpysparkrdd

解决方案


创建一个 Dataframe,其中 record_type 作为第一列,所有剩余的作为第二列,如下所示,

input_df=spark.createDataFrame(sc.textFile("test.txt").map(lambda x : tuple([x[0:2], x[3:]])))

+-----------+-----------------+
|    _1     |       _2        | 
+-----------+-----------------+
|         01|"Alice","The Cat"|
|         02|              Yes|
|         03|       2000,01,01|
|         01|  "Bob","The Dog"|
|         02|               No|
|         03|       2001,01,04|
+-----------+-----------------+

然后,根据类型过滤行并存储它们。

types=["01","02","03"]
for type in types:
    input_df.filter(col("_1") == type).write.mode('overwrite').csv(f"{type}_rows.csv")

推荐阅读