csv - 在 csv 文件中的每个有效列之后都有一个带有 NULL 的列,如何删除这些列并重新加载 csv
问题描述
这是一个示例 csv 数据:
"ID", "name", "abbreviation", "CreatedTime", "CreatedByAccount", "UpdatedTime", "UpdatedByAccount", "inc_country_id", "time_zone_id"
"1","NULL","UNITED ARAB EMIRATES"",NULL","AE","NULL","2015-07-01 20:41:49","NULL","379","NULL","2016-03-16 07:38:49","NULL","8215","NULL","262","NULL","9","NULL"
当我尝试使用 pyspark 创建数据框时,这会导致列不匹配,大约有 600 多个此类文件具有上述数据,我需要使用正确的列映射读取所有这些文件
>>> df=spark.read.csv("s3://xyz.csv",header=True)
>>> df.show()
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+------------+
| ID|name| abbreviation|CreatedTime|CreatedByAccount|UpdatedTime| UpdatedByAccount|inc_country_id|time_zone_id|
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+------------+
| 1|NULL|UNITED ARAB EMIRATES| NULL| AE| NULL|2015-07-01 20:41:49| NULL| 379|
| 2|NULL| ARGENTINA| NULL| AR| NULL|2015-07-01 20:41:49| NULL| 379|
我尝试了通过创建自定义模式并读取 csv 文件的方法,但这必须针对 600 多个具有不同大小和列的文件完成
>>> abc=StructType([StructField('ID',StringType(),True),StructField('c1',StringType(),True),StructField('name',StringType(),True),StructField('c2',StringType(),True),StructField('abbreviation',StringType(),True),StructField('c3',StringType(),True),StructField('CreatedTime',StringType(),True),StructField('c4',StringType(),True),StructField('CreatedByAccount',StringType(),True),StructField('c5',StringType(),True),StructField('UpdatedTime',StringType(),True),StructField('c6',StringType(),True),StructField('UpdatedByAccount',StringType(),True),StructField('c7',StringType(),True),StructField('inc_country_id',StringType(),True),StructField('c8',StringType(),True),StructField('time_zone_id',StringType(),True),StructField('c9',StringType(),True)])
>>> df=spark.read.csv("s3://xyz.csv/",schema=abc)
>>> df.show()
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+----------------+----+-------------------+----+----------------+----+--------------+----+------------+----+
| ID| c1| name| c2| abbreviation| c3| CreatedTime| c4|CreatedByAccount| c5| UpdatedTime| c6|UpdatedByAccount| c7|inc_country_id| c8|time_zone_id| c9|
+---+----+--------------------+-----------+----------------+-----------+-------------------+--------------+----------------+----+-------------------+----+----------------+----+--------------+----+------------+----+
| 1|NULL|UNITED ARAB EMIRATES| NULL| AE| NULL|2015-07-01 20:41:49| NULL| 379|NULL|2016-03-16 07:38:49|NULL| 8215|NULL| 262|NULL| 9|NULL|
| 2|NULL| ARGENTINA| NULL| AR| NULL|2015-07-01 20:41:49| NULL| 379|NULL|2015-10-28 21:07:47|NULL| 379|NULL| 187|NULL| None|NULL|
有没有使用pyspark重新加载所有没有NULL的文件的通用方法?
解决方案
我的解决方案是读取文件两次:一次用于获取模式(然后对其进行操作),一次用于实际读取
# keep original fields so we can `select` later
df_schema = spark.read.csv('a.csv', header=True)
original_fields = df_schema.schema.fields
# adding extra dummy column after each valid column
expanded_fields = []
for i, field in enumerate(original_fields):
expanded_fields.append(field)
expanded_fields.append(T.StructField(f'col_{i}', T.StringType()))
# build a "fake" schema to fit with csv
schema = T.StructType(expanded_fields)
# using "fake" schema to load CSV, then select only valid columns from original fields
df = spark.read.csv('a.csv', header=True, schema=schema).select([field.name for field in original_fields])
df.show()
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
# | ID| name|abbreviation| CreatedTime|CreatedByAccount| UpdatedTime|UpdatedByAccount|inc_country_id|time_zone_id|
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
# | 1|UNITED ARAB EMIRATES| AE|2015-07-01 20:41:49| 379|2016-03-16 07:38:49| 8215| 262| 9|
# +---+--------------------+------------+-------------------+----------------+-------------------+----------------+--------------+------------+
推荐阅读
- mongodb - MongoDB 低 CPU 和性能
- r - 从 R 中的 EViews 重新创建 ARMA 模型
- linux - Docker 复制的文件夹在 Jenkins 中无法访问
- python - 根据其 id 用间隙替换 FASTA 序列
- c - 如何使用 C 中的 sqlite3 将运行时获得的变量的值传递给更新查询?
- javascript - 如何在引导程序中设置 window.alert 样式
- tkinter - Tkinter - grid_bbox() 它是如何工作的?
- c++ - 如何将值存储/分配给大于其范围的数据类型
- nhibernate - 连接列的 nHibernate Projections.Sum
- javascript - Vue 计算属性未更新。非常奇怪的行为