apache-spark - PySpark 如何读取具有不同列的 csv 文件?
问题描述
我正在从不同的提供商加载 CSV 文件
s3://data-bucket/sources/provider-1.csv
root
|-- name: string (nullable = true)
|-- sid: string (nullable = true)
|-- pid: string (nullable = true)
|-- gender: string (nullable = true)
|-- price: string (nullable = true)
|-- currency: string (nullable = true)
s3://data-bucket/sources/provider-2.csv
root
|-- sid: string (nullable = true)
|-- pid: string (nullable = true)
|-- currency: string (nullable = true)
|-- name: string (nullable = true)
|-- gender: string (nullable = true)
|-- price: string (nullable = true)
s3://data-bucket/sources/provider-3.csv
root
|-- color: string (nullable = true)
|-- sid: string (nullable = true)
|-- currency: string (nullable = true)
|-- name: string (nullable = true)
|-- price: string (nullable = true)
不同的提供者有不同的列和不同的顺序。
schema_list = []
for field in ['sid', 'pid', 'name', 'price', 'currency', 'color', 'gender']:
schema_list.append(StructField(field, StringType(), True))
schema = StructType(schema_list)
df = spark.read.csv(
header=True,
schema=schema,
sep="\t",
quote="\"",
escape="\"",
path='s3://data-bucket/sources/*.csv',
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
)
df.write.parquet('s3://data-bucket/output/', mode="overwrite")
基本上我想在这里做的是规范化所有提供者的列,并输出为镶木地板格式。但是输出数据混乱。
有什么方法可以加载具有不同列的多个文件?
解决方案
推荐阅读
- python - 数组的分割部分
- ios - 带有 UISegmentedControl 和 childViewController 的 iOS LargeTitle(在容器中)
- java - 如何在 Jersey/Dropwizard 中获取变量列表 JSON 正文?
- mysql - 在mysql中创建过程时出现语法错误
- c - 通过自定义排序/删除链表中的重复项跳过最后一项
- python - 在 Python 中重塑数据集
- javascript - Slick Slider - 将图像加载到幻灯片中的 API
- python - 如果一个元组是另一个元组的子集,如何有效地查找
- vb.net - 使用 VB.net 中的 Google Cloud 服务向 Google Drive 上传/下载文件
- ios - 如何在应用商店的产品页面中编辑语言信息?