首页 > 解决方案 > 架构定义 Spark 读取

问题描述

读取具有已定义架构的 CSV 文件,我能够加载文件并进行处理,使用以下代码可以正常工作。模式被定义为严格遵循数据类型来准确记录精度。

source_schema = StructType([
            StructField("COL1", StringType(), True),
            StructField("COL2", StringType(), True),
            StructField("COL3", StringType(), True),
            StructField("COL4", StringType(), True),
            StructField("COL5", StringType(), True)])

df_raw_file = in_spark.read \
            .format("csv") \
            .option("delimiter", delimiter) \
            .option("header", "false") \
            .option("inferSchema", "true") \
            .option("columnNameOfCorruptRecord", "BAD_RECORD") \
            .schema(source_schema) \
            .load(file)

现在我们收到的 CSV 文件将从明年开始省略几列,假设 COL4 今后不会成为文件的一部分。但是我们应该能够处理这两个文件,因为如果需要我们会重新处理旧文件,那么如何处理这样的要求。我可能会阅读 CSV 的示例,使用 df.columns 获取列并比较两个预定义的模式。如果有其他选择,如果我能得到领导,那将会很有帮助。

标签: apache-sparkpysparkapache-spark-sql

解决方案


如果您将mode选项设置为PERMISSIVE它应该处理您的情况https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read%20csv#pyspark.sql.DataFrameReader.csv

当它遇到标记少于模式长度的记录时,将 null 设置为额外的字段。


推荐阅读