首页 > 解决方案 > 如果数据框列中的值为 NULL,如何使用 if else 语句引发错误?

问题描述

我正在将 csv 文件作为 DF 读取,其中最后 3 个必填列包含 NULL 值csv 文件

谁能指导我如何在 scala spark 的 UDF 中使用 if else 语句,如果这些列中有任何空值,它应该抛出一个错误,指出“强制字段不能为空”?我已经在 scala spark 中编写了代码,所以如果我得到任何建议会很有帮助。

这是我的第一个代码,所以请原谅我的错误。从下面的代码中,请指导我如何以这样的方式获取详细信息生成的 DF 应插入到数据库中。请分享您的建议:

import java.util.Date
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._


object InputValidation {

  val conf: SparkConf = new SparkConf()
    .setAppName("Excel to DataFrame")
    .setMaster("local[*]")

  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")

  val spark: SparkSession = SparkSession.builder()
    .appName("Excel to DataFrame")
    .config("spark.master", "local")
    .getOrCreate()




 val structType: StructType = {
    val sno = StructField("S.No", IntegerType, nullable = true)
    val fname = StructField("Firm Name", StringType, nullable = true)
    val address = StructField("Address", StringType, nullable = true)
    val country = StructField("Country", StringType, nullable = true)
    val pcode = StructField("Post Code", IntegerType, nullable = true)
    val tnumber = StructField("Telephone Number", IntegerType, nullable = true)
    val waddress = StructField("Web Address", StringType, nullable = true)
    val mail = StructField("Mail ID", StringType, nullable = true)
    val fstatus = StructField("Firm Status", StringType, nullable = false)
    val btype = StructField("Business Type", StringType, nullable = false)
    val edate = StructField("Effective Date", DateType, nullable = false)
    new StructType(Array(sno, fname, address, country, pcode, tnumber, waddress, mail,
      fstatus, btype, edate))
  }
  def main(args: Array[String]): Unit = {
  val inputDF: DataFrame = spark.read
      .schema(structType)
      .option("header", "true")
      .option("delimiter", ",")
      .csv("G:\\CSV\\FirmRegistration.csv")


def isValidUDF: Object = udf({
  (fstatus: String, btype: String, edate: Date) => {
    val validfs = Seq("New", "Authorised", "EEA Authorised", "Cancelled")
    if (validfs.contains(fstatus)) {
      return fstatus
    }
    else {
      throw new Exception("Incorrect firm status")
    }

    val validbt = Seq("Regulated", "PSD", "EEA")
    if (validbt.contains(btype)) {
      return btype
    }
    else {
      throw new Exception("Incorrect firm business type")
    }

    if (edate != null) {
      return edate
    }
    else {
      throw new Exception("Effective date cannot be NULL")
    }
  }
})

  val userDF = udf(isValidUDF _)
  val resultDF = inputDF.withColumn("IsValid", userDF())
  resultDF.show()

//Load the result as a table into Database
    val driver = "org.postgresql.Driver"
    val url = "jdbc:postgressql://localhost:5432/rtjvm"
    val user = "docker"
    val password = "docker"

    inputDF.write
      .format("jdbc")
      .option("driver",driver)
      .option("url",url)
      .option("user",user)
      .option("password",password)
      .option("dbtable","public.input")
      .save()

}

}

如果我运行上述程序,我会收到错误消息:线程“main”java.lang.UnsupportedOperationException 中的异常:不支持对象类型的架构

标签: scaladataframeapache-sparkuser-defined-functions

解决方案


def checkNullUDF = udf({
  (firmStatus: String, businessType: String, effectiveDate: String) => {
    if(firmStatus == null || businessType == null || effectiveDate == null) true else false
  }
})

df.withColumn("IsNull", checkNullUDF($"Firm Status", $"Business Type", $"Effective Date")

这将为您提供一个带有真/假值的“IsNull”列,您可以使用它来相应地引发异常。


推荐阅读