apache-spark - 为什么数据框没有在 Spark 中使用“FAILFAST”选项抛出 RunTimeException?
问题描述
我有以下输入文件,其中可能有错误记录,我想抛出异常并识别列名列不是按照我的自定义架构。根据我的理解,即使我们没有对其调用任何操作,数据框也应该立即抛出异常。
1,a,10000,11-03-2019,浦那
2,b,10020,14-03-2019,浦那
3,a,34567,15-03-2019,浦那
tyui,a,fgh-03-2019,浦那
4,b,10020,14-03-2019,浦那
我已经尝试将“FAILFAST”选项设置为 spark 数据框,但它并没有在我的最后抛出任何异常。
我试过下面的代码。
SparkSession ss = SparkSession.builder().appName("Data Quality Frameowrk")
.master("local")
.getOrCreate();
try {
StructField[] fields = new StructField[5];
fields[0] = new StructField("id", DataTypes.IntegerType, false,Metadata.empty());
fields[1] = new StructField("name", DataTypes.StringType, false,Metadata.empty());
fields[2] = new StructField("salary", DataTypes.DoubleType, false,Metadata.empty());
fields[3] = new StructField("dob", DataTypes.DateType, false,Metadata.empty());
fields[4] = new StructField("loc", DataTypes.StringType, false,Metadata.empty());
StructType customSchema = new StructType(fields);
ss.read().format("csv")
.schema(customSchema)
.option("mode", "FAILFAST")
.load("C:\\\\Users\\\\manoj.dhake\\\\Downloads\\\\softwares\\\\neo4jdata\\\\employee.csv");
}catch(Exception e) {
System.out.println("want to catch column name ,due to which error has been occured");
e.printStackTrace();
}
注意:程序应该能够在数据类型不匹配的情况下捕获列名并继续执行流程(不应异常终止)。
解决方案
这是因为 Spark 是惰性的,调用时甚至不读取数据load
,只处理数据帧才会触发实际读取。根据文档
FAILFAST :遇到损坏的记录时抛出异常。
因此,它与使负载急切无关。可以通过手动触发处理来急切地完成验证,但如果所有条目都有效,它将导致所有数据被处理两次。使用以下方法可以在一定程度上减轻性能影响cache
:
val df = spark.read
.schema(StructType(Seq(StructField("test", IntegerType))))
.option("mode", "FAILFAST")
.csv(Seq("a").toDS())
.cache()
df.count()
会抛出
aorg.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
推荐阅读
- node.js - 异步读取文件
- python - 使用并行化处理大型 Numpy python 图像
- swift - SwiftUI .dateComponents 今天而不是明天
- firebase - 如果用户存在 Google 登录,但不存在电子邮件/密码,想要处理 firebase 身份验证错误
- javascript - Snipcart 中出现“购物车确认”错误
- r - 随机重新编码每行中值的第一个和第二个实例?
- php - Laravel 过滤关系数据
- symfony - 会话在 Symfony 中使用 phpunit 启动后,我无法设置会话 ID
- asp.net-core - 如何在 vs 代码中安装 Microsoft.EntityFrameworkCore 包
- python - Python 问题:无法在单独的线程或事件循环中运行连接 aiohttp 服务器