csv - 从 csv 加载数据帧,丢弃所有不遵循模式的数据
问题描述
我正在尝试从 csv 文件中读取并将其加载到数据框中。现在,我尝试了以下方法。
val schema = StructType(Seq(
StructField("key", StringType, true),
StructField("value", DoubleType, false)
))
val df = sc.read.schema(schema)
.option("mode", "DROPMALFORMED")
.csv("C:\\Users\\raam\\IdeaProjects\\myPlayGround\\abc.csv")
df.show()
我的,csv 看起来像这样
qqq
a,1
b,2.2
c,xyz
d,4.5
e,asfsdfsdf
dd,
f,3.1
2,
,6.6
我的输出是
+----+-----+
| key|value|
+----+-----+
| a| 1.0|
| b| 2.2|
| d| 4.5|
| dd| null|
| f| 3.1|
| 2| null|
|null| 6.6|
+----+-----+
在第二个StrcutField
中,可为空的为假,为什么我的 df 中的第 4 行和第 6 行?另外,他们是否可以在读取时创建两个单独的 df 以便我可以将丢弃的行放在单独的文件中?
解决方案
DROPMALFORMED 模式确实会删除不符合架构的记录(在您的情况下,记录的值为非数字),但是它忽略了可空性。查看这些问题:SPARK-10848、SPARK-25545和这个 pull request以了解其基本原理。如您所见,它不受欢迎,有些人试图修复它,但到目前为止还没有成功。
然后,您需要显式处理您身边的空记录。
val schema = StructType(Seq(
StructField("key", StringType, true),
StructField("value", DoubleType, false),
StructField("corruptRecord", StringType, true)
))
val df = spark.read.schema(schema).
option("mode", "PERMISSIVE").
option("columnNameOfCorruptRecord", "corruptRecord").
csv("abc.csv")
def isValid = 'corruptRecord.isNull && 'value.isNotNull
val validDf = df.filter(isValid)
val invalidDf = df.filter(!isValid)
isValid
可以用更通用的方式定义:
def isValid = schema.filter(!_.nullable)
.foldLeft('corruptRecord.isNull)((acc, f) => acc && col(f.name).isNotNull)
至少当您没有嵌套结构/数组时。
推荐阅读
- android - 推送通知不适用于 NotificationCompat.BigPictureStyle(无法放大通知视图)
- spectre - Spectre V1 PoC 代码
- r - 使用 dplyr 为给定组创建唯一值组合的向量
- entity-framework - 实体框架配置 addorupdate 多对多种子记录
- javascript - (修改 JSFiddle)如何使下拉框仅根据您在上一个下拉框中选择的内容可见?
- c# - 如何在 Crystal Report 运行时 13.2.469 中运行我的 Crystal Report
- android - 在路径上找不到类“com.google.android.gms.common.internal.zzbq”:DexPathList
- webhooks - 通过 slack webhook 将消息发送到不同的通道失败
- jdbc - Logstash Jdbc_streaming 过滤器插件使用“参数”选项返回一个空集
- c#-4.0 - 如何获取 tfs 中用户故事的“测试者”部分下的测试用例的名称和 ID