apache-spark - 架构定义 Spark 读取
问题描述
读取具有已定义架构的 CSV 文件,我能够加载文件并进行处理,使用以下代码可以正常工作。模式被定义为严格遵循数据类型来准确记录精度。
source_schema = StructType([
StructField("COL1", StringType(), True),
StructField("COL2", StringType(), True),
StructField("COL3", StringType(), True),
StructField("COL4", StringType(), True),
StructField("COL5", StringType(), True)])
df_raw_file = in_spark.read \
.format("csv") \
.option("delimiter", delimiter) \
.option("header", "false") \
.option("inferSchema", "true") \
.option("columnNameOfCorruptRecord", "BAD_RECORD") \
.schema(source_schema) \
.load(file)
现在我们收到的 CSV 文件将从明年开始省略几列,假设 COL4 今后不会成为文件的一部分。但是我们应该能够处理这两个文件,因为如果需要我们会重新处理旧文件,那么如何处理这样的要求。我可能会阅读 CSV 的示例,使用 df.columns 获取列并比较两个预定义的模式。如果有其他选择,如果我能得到领导,那将会很有帮助。
解决方案
如果您将mode
选项设置为PERMISSIVE
它应该处理您的情况https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read%20csv#pyspark.sql.DataFrameReader.csv:
当它遇到标记少于模式长度的记录时,将 null 设置为额外的字段。