首页 > 解决方案 > 使用scala中的spark数据帧处理csv文件中的空值(StructType)

问题描述

我正在尝试使用我自己定义的模式将 csv 文件加载到数据框中。我所需要的只是在将其作为数据框加载时识别不良数据。

在我看来,坏数据是

  1. 当架构告诉整数时,输入包含字符串
  2. 意外/错误的分隔符
  3. 当我的架构说 nullable => false 时,任何列数据如下所示
    1. 无效的
    2. ''
    3. //没有什么

我正在使用额外的列“_corrupt_record”列来重定向具有上述 3 种情况的记录。

我可以看到在案例 1) 和案例 2) 期间填充了这个额外的列,但是当数据为 Null(案例 3.3)时,记录不会重定向到这个额外的列。它适用于 3.1 和 3.2。

我在哪里做错了?

您可以向我建议您在实时项目中使用的任何其他方式来处理/将不良数据重定向到文件中,同时将原始文件加载到数据帧中。

输入文件 Products.txt 架构:产品(product_id、product_name、product_type、product_version、product_price)

代码:

val spark= new sql.SparkSession.Builder().master("local[*]").getOrCreate()

val products_schema= StructType(List
  (
  StructField("product_id",IntegerType,false),
  StructField("product_name",StringType,false),
  StructField("product_type",StringType,true),
  StructField("product_version",StringType,true),
  StructField("product_price",StringType,true),
  StructField("_corrupt_record",StringType,true)
  )
)
val products_Staging_df=spark.read.option("header", false).option("delimiter", "|").schema(products_schema).csv("C:\\Users\\u6062310\\Desktop\\DBS\\Product.txt")


products_Staging_df.printSchema()
products_Staging_df.show()`

我放了一些不良记录,当我使用 df.show() 时,我希望 product_id = 的记录也位于 _corrupt_record 列下。但它不会来。

只有 Null 和 '' 工作正常。如何处理空白?

标签: scalaapache-sparkhadoopapache-spark-sqlbigdata

解决方案


推荐阅读