首页 > 解决方案 > 为什么数据框没有在 Spark 中使用“FAILFAST”选项抛出 RunTimeException?

问题描述

我有以下输入文件,其中可能有错误记录,我想抛出异常并识别列名列不是按照我的自定义架构。根据我的理解,即使我们没有对其调用任何操作,数据框也应该立即抛出异常。

1,a,10000,11-03-2019,浦那

2,b,10020,14-03-2019,浦那

3,a,34567,15-03-2019,浦那

tyui,a,fgh-03-2019,浦那

4,b,10020,14-03-2019,浦那

我已经尝试将“FAILFAST”选项设置为 spark 数据框,但它并没有在我的最后抛出任何异常。

我试过下面的代码。

SparkSession ss = SparkSession.builder().appName("Data Quality Frameowrk")
    .master("local")
    .getOrCreate();

    try {
    StructField[] fields = new StructField[5];
    fields[0] = new StructField("id", DataTypes.IntegerType, false,Metadata.empty());
    fields[1] = new StructField("name", DataTypes.StringType, false,Metadata.empty());
    fields[2] = new StructField("salary", DataTypes.DoubleType, false,Metadata.empty());
    fields[3] = new StructField("dob", DataTypes.DateType, false,Metadata.empty());
    fields[4] = new StructField("loc", DataTypes.StringType, false,Metadata.empty());
    StructType customSchema = new StructType(fields);

    ss.read().format("csv")
            .schema(customSchema)
            .option("mode", "FAILFAST")
            .load("C:\\\\Users\\\\manoj.dhake\\\\Downloads\\\\softwares\\\\neo4jdata\\\\employee.csv");


    }catch(Exception e) {
        System.out.println("want to catch column name ,due to which error has been occured");
        e.printStackTrace();
    }

注意:程序应该能够在数据类型不匹配的情况下捕获列名并继续执行流程(不应异常终止)。

标签: apache-sparkdataframe

解决方案


这是因为 Spark 是惰性的,调用时甚至不读取数据load,只处理数据帧才会触发实际读取。根据文档

FAILFAST :遇到损坏的记录时抛出异常。

因此,它与使负载急切无关。可以通过手动触发处理来急切地完成验证,但如果所有条目都有效,它将导致所有数据被处理两次。使用以下方法可以在一定程度上减轻性能影响cache

val df = spark.read
  .schema(StructType(Seq(StructField("test", IntegerType))))
  .option("mode", "FAILFAST")
  .csv(Seq("a").toDS())
  .cache()
df.count()

会抛出

aorg.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.

推荐阅读