首页 > 解决方案 > 从 csv 加载数据帧,丢弃所有不遵循模式的数据

问题描述

我正在尝试从 csv 文件中读取并将其加载到数据框中。现在,我尝试了以下方法。

    val schema = StructType(Seq(
      StructField("key", StringType, true),
      StructField("value", DoubleType, false)
    ))
    val df = sc.read.schema(schema)
      .option("mode", "DROPMALFORMED")
      .csv("C:\\Users\\raam\\IdeaProjects\\myPlayGround\\abc.csv")
    df.show()

我的,csv 看起来像这样

qqq
a,1
b,2.2
c,xyz
d,4.5
e,asfsdfsdf 
dd,
f,3.1
2,
,6.6

我的输出是

+----+-----+
| key|value|
+----+-----+
|   a|  1.0|
|   b|  2.2|
|   d|  4.5|
|  dd| null|
|   f|  3.1|
|   2| null|
|null|  6.6|
+----+-----+

在第二个StrcutField中,可为空的为假,为什么我的 df 中的第 4 行和第 6 行?另外,他们是否可以在读取时创建两个单独的 df 以便我可以将丢弃的行放在单独的文件中?

标签: csvapache-sparkapache-spark-sql

解决方案


DROPMALFORMED 模式确实会删除不符合架构的记录(在您的情况下,记录的值为非数字),但是它忽略了可空性。查看这些问题:SPARK-10848SPARK-25545这个 pull request以了解其基本原理。如您所见,它不受欢迎,有些人试图修复它,但到目前为止还没有成功。

然后,您需要显式处理您身边的空记录。

val schema = StructType(Seq(
  StructField("key", StringType, true),
  StructField("value", DoubleType, false),
  StructField("corruptRecord", StringType, true)
))

val df = spark.read.schema(schema).
  option("mode", "PERMISSIVE").
  option("columnNameOfCorruptRecord", "corruptRecord").
  csv("abc.csv")

def isValid = 'corruptRecord.isNull && 'value.isNotNull
val validDf = df.filter(isValid)
val invalidDf = df.filter(!isValid)

isValid可以用更通用的方式定义:

def isValid = schema.filter(!_.nullable)
  .foldLeft('corruptRecord.isNull)((acc, f) => acc && col(f.name).isNotNull)

至少当您没有嵌套结构/数组时。


推荐阅读