首页 > 解决方案 > Spark/Scala - 在流式 DataFrame 的一行中验证 JSON 文档

问题描述

我有一个流式应用程序,它正在处理一个流式 DataFrame,其列“body”包含一个 JSON 字符串。

所以在正文中是这样的(这些是四个输入行):

{"id":1, "ts":1557994974, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3,"attr3":"something"}]}
{"id":2, "ts":1557994975, "details":[{"id":1,"attr2":"3","attr3":"something"}, {"id":2,"attr2":"3","attr3":"something"},{"id":3,"attr2":"3","attr3":"something"}]}
{"id":3, "ts":1557994976, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3}]}
{"id":4, "ts":1557994977, "details":[]}

我想检查每一行是否具有正确的架构(数据类型并包含所有属性)。我想过滤掉并在某处记录无效记录(如 Parquet 文件)。我对“详细信息”数组特别感兴趣——每个嵌套文档都必须具有指定的字段和正确的数据类型。

所以在上面的例子中,只有 row id = 1 是有效的。

我正在考虑一个案例类,例如:

case class Detail(
  id: Int,
  attr2: Int,
  attr3: String
)

case class Input(
  id: Int,
  ts: Long,
  details: Seq[Detail]
)

并尝试但不确定如何去做。

有人可以帮忙吗?

谢谢

标签: scalaapache-spark

解决方案


一种方法是使用JSON Schema,它可以帮助您对数据进行模式验证。如果您是新手,入门页面是一个很好的起点

另一种方法大致如下

  1. 为您在问题中尝试过的每个对象构建模型(案例类)。

  2. 使用像Spray JSON / Play-JSON这样的 JSON 库来解析输入 json。

  3. 对于所有无法解析为有效记录的输入,很可能是无效的,您可以将这些输出分区到 Spark 代码中的不同接收器中。如果您isValid在对象上有一个方法可以验证解析的记录是否正确,它也将使其变得健壮。


推荐阅读