首页 > 解决方案 > 在 csv 文件中的每个有效列之后都有一个带有 NULL 的列,如何删除这些列并重新加载 csv

问题描述

这是一个示例 csv 数据:

"ID", "name", "abbreviation", "CreatedTime", "CreatedByAccount", "UpdatedTime", "UpdatedByAccount", "inc_country_id", "time_zone_id"
"1","NULL","UNITED ARAB EMIRATES"",NULL","AE","NULL","2015-07-01 20:41:49","NULL","379","NULL","2016-03-16 07:38:49","NULL","8215","NULL","262","NULL","9","NULL"

当我尝试使用 pyspark 创建数据框时,这会导致列不匹配,大约有 600 多个此类文件具有上述数据,我需要使用正确的列映射读取所有这些文件

>>> df=spark.read.csv("s3://xyz.csv",header=True)

>>> df.show()                                                                   
    +---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+------------+
| ID|name|        abbreviation|CreatedTime|CreatedByAccount|UpdatedTime|   UpdatedByAccount|inc_country_id|time_zone_id|
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+------------+
|  1|NULL|UNITED ARAB EMIRATES|       NULL|              AE|       NULL|2015-07-01 20:41:49|          NULL|         379|
|  2|NULL|           ARGENTINA|       NULL|              AR|       NULL|2015-07-01 20:41:49|          NULL|         379|

我尝试了通过创建自定义模式并读取 csv 文件的方法,但这必须针对 600 多个具有不同大小和列的文件完成

>>> abc=StructType([StructField('ID',StringType(),True),StructField('c1',StringType(),True),StructField('name',StringType(),True),StructField('c2',StringType(),True),StructField('abbreviation',StringType(),True),StructField('c3',StringType(),True),StructField('CreatedTime',StringType(),True),StructField('c4',StringType(),True),StructField('CreatedByAccount',StringType(),True),StructField('c5',StringType(),True),StructField('UpdatedTime',StringType(),True),StructField('c6',StringType(),True),StructField('UpdatedByAccount',StringType(),True),StructField('c7',StringType(),True),StructField('inc_country_id',StringType(),True),StructField('c8',StringType(),True),StructField('time_zone_id',StringType(),True),StructField('c9',StringType(),True)])
>>> df=spark.read.csv("s3://xyz.csv/",schema=abc)
>>> df.show()
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+----------------+----+-------------------+----+----------------+----+--------------+----+------------+----+
| ID|  c1|                name|         c2|    abbreviation|         c3|        CreatedTime|            c4|CreatedByAccount|  c5|        UpdatedTime|  c6|UpdatedByAccount|  c7|inc_country_id|  c8|time_zone_id|  c9|
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+----------------+----+-------------------+----+----------------+----+--------------+----+------------+----+
|  1|NULL|UNITED ARAB EMIRATES|       NULL|              AE|       NULL|2015-07-01 20:41:49|          NULL|             379|NULL|2016-03-16 07:38:49|NULL|            8215|NULL|           262|NULL|           9|NULL|
|  2|NULL|           ARGENTINA|       NULL|              AR|       NULL|2015-07-01 20:41:49|          NULL|             379|NULL|2015-10-28 21:07:47|NULL|             379|NULL|           187|NULL|        None|NULL|

有没有使用pyspark重新加载所有没有NULL的文件的通用方法?

标签: csvpysparknull

解决方案


我的解决方案是读取文件两次:一次用于获取模式(然后对其进行操作),一次用于实际读取

# keep original fields so we can `select` later
df_schema = spark.read.csv('a.csv', header=True)
original_fields = df_schema.schema.fields

# adding extra dummy column after each valid column
expanded_fields = []
for i, field in enumerate(original_fields):
    expanded_fields.append(field)
    expanded_fields.append(T.StructField(f'col_{i}', T.StringType()))

# build a "fake" schema to fit with csv
schema = T.StructType(expanded_fields)

# using "fake" schema to load CSV, then select only valid columns from original fields
df = spark.read.csv('a.csv', header=True, schema=schema).select([field.name for field in original_fields])
df.show()
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
# | ID|                name|abbreviation|        CreatedTime|CreatedByAccount|        UpdatedTime|UpdatedByAccount|inc_country_id|time_zone_id|
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
# |  1|UNITED ARAB EMIRATES|          AE|2015-07-01 20:41:49|             379|2016-03-16 07:38:49|            8215|           262|           9|
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+

推荐阅读