首页 > 解决方案 > 如何在 Spark 数据集中抛出异常

问题描述

我正在通过 Spark (java) 加载 csv 文件

Dataset<Row> dataset = sparkSession.read().option("header", "true").csv("/test.csv");

这是文件的架构:

dataset.printSchema();
root
    |-- eid: string (nullable = true)
    |-- name: string (nullable = true)
    |-- salary: string (nullable = true)
    |-- designation: string (nullable = true)

这是样本数据:

dataset.show();

+-----+------------------+------------+-----------+
|  eid|              name|      salary|designation|
+-----+------------------+------------+-----------+
|    1|            "John"|     "10000"|       "SE"|
|    2|             "Dan"|    "100000"|       "SE"|
|    3|         "ironman"|  "10000000"|     "King"|
|    4|          "Batman"| "100000000"|  "Fighter"|
|awqwq| "captain america"|    "300000"|  "Captain"|
+-----+------------------+------------+-----------+

转换为整数类型

dataset = dataset.withColumn("eid", dataset.col("eid").cast(DataTypes.IntegerType));
dataset.show();

+----+------------------+------------+-----------+
| eid|              name|      salary|designation|
+----+------------------+------------+-----------+
|   1|            "John"|     "10000"|       "SE"|
|   2|             "Dan"|    "100000"|       "SE"|
|   3|         "ironman"|  "10000000"|     "King"|
|   4|          "Batman"| "100000000"|  "Fighter"|
|null| "captain america"|    "300000"|  "Captain"|
+----+------------------+------------+-----------+

但是在 eid 列中的值转换后变为空(字符串值)。它没有抛出任何铸造异常。

有什么办法可以抛出异常。我有大量的列,需要抛出异常

标签: javaapache-sparkapache-spark-dataset

解决方案


可能最流畅的方法是避免强制转换和使用带有预定义模式的故障快速读取模式:

spark.read
  .schema("eid INT, name STRING, salary STRING, designation STRING")
  .option("mode", "FAILFAST")
  .option("header", true)
  .csv("/test.csv")
  .show()

这抛出:

org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.

如果由于某种原因不能使用这种方法,可以使用数据集 API 完成任意转换和其他操作。这个例子是用 Scala 编写的,但也可以用 Java 编写:

spark.read
  .option("header", true)
  .csv("/test.csv")
  .as[(String, String, String, String)]
  .map {
    case (eid, name, salary, designation) => (eid.toInt, name, salary, designation)
  }
  .show()

投掷

java.lang.NumberFormatException: For input string: "awqwq"

或者,也可以使用 UDF。


推荐阅读