首页 > 解决方案 > PySpark 如何读取具有不同列的 csv 文件?

问题描述

我正在从不同的提供商加载 CSV 文件

s3://data-bucket/sources/provider-1.csv

root
 |-- name: string (nullable = true)
 |-- sid: string (nullable = true)
 |-- pid: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- price: string (nullable = true)
 |-- currency: string (nullable = true)

s3://data-bucket/sources/provider-2.csv

root
 |-- sid: string (nullable = true)
 |-- pid: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- price: string (nullable = true)

s3://data-bucket/sources/provider-3.csv

root
 |-- color: string (nullable = true)
 |-- sid: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: string (nullable = true)

不同的提供者有不同的列和不同的顺序。

schema_list = []
for field in ['sid', 'pid', 'name', 'price', 'currency', 'color', 'gender']:
    schema_list.append(StructField(field, StringType(), True))
schema = StructType(schema_list)

df = spark.read.csv(
    header=True,
    schema=schema,
    sep="\t",
    quote="\"",
    escape="\"",
    path='s3://data-bucket/sources/*.csv',
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
)
df.write.parquet('s3://data-bucket/output/', mode="overwrite")

基本上我想在这里做的是规范化所有提供者的列,并输出为镶木地板格式。但是输出数据混乱。

有什么方法可以加载具有不同列的多个文件?

标签: apache-sparkpysparkparquetamazon-emr

解决方案


推荐阅读