scala - Spark/Scala - 在流式 DataFrame 的一行中验证 JSON 文档
问题描述
我有一个流式应用程序,它正在处理一个流式 DataFrame,其列“body”包含一个 JSON 字符串。
所以在正文中是这样的(这些是四个输入行):
{"id":1, "ts":1557994974, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3,"attr3":"something"}]}
{"id":2, "ts":1557994975, "details":[{"id":1,"attr2":"3","attr3":"something"}, {"id":2,"attr2":"3","attr3":"something"},{"id":3,"attr2":"3","attr3":"something"}]}
{"id":3, "ts":1557994976, "details":[{"id":1,"attr2":3,"attr3":"something"}, {"id":2,"attr2":3}]}
{"id":4, "ts":1557994977, "details":[]}
我想检查每一行是否具有正确的架构(数据类型并包含所有属性)。我想过滤掉并在某处记录无效记录(如 Parquet 文件)。我对“详细信息”数组特别感兴趣——每个嵌套文档都必须具有指定的字段和正确的数据类型。
所以在上面的例子中,只有 row id = 1 是有效的。
我正在考虑一个案例类,例如:
case class Detail(
id: Int,
attr2: Int,
attr3: String
)
case class Input(
id: Int,
ts: Long,
details: Seq[Detail]
)
并尝试但不确定如何去做。
有人可以帮忙吗?
谢谢
解决方案
一种方法是使用JSON Schema,它可以帮助您对数据进行模式验证。如果您是新手,入门页面是一个很好的起点。
另一种方法大致如下
为您在问题中尝试过的每个对象构建模型(案例类)。
使用像Spray JSON / Play-JSON这样的 JSON 库来解析输入 json。
对于所有无法解析为有效记录的输入,很可能是无效的,您可以将这些输出分区到 Spark 代码中的不同接收器中。如果您
isValid
在对象上有一个方法可以验证解析的记录是否正确,它也将使其变得健壮。
推荐阅读
- sql - Access - SQL 查询将库存分配给订单
- ios - IOS 应用程序在审核时崩溃(异常类型:EXC_BREAKPOINT (SIGTRAP))
- python - 我如何有效地找出有多少数不能从素数列表中整除?
- javascript - 如何添加确认消息 ASP 按钮单击
- java - 使用 SwipRefresh 库刷新 recyclerview 时出现 onDatachange 错误
- android - 对象在发送到另一个 Fragment 后发生变化
- evolutionary-algorithm - DEAP进化模块,总是评估整个种群
- python-3.x - Selenium 运行 Chrome 配置文件但不运行其余脚本
- python - Discord.py 大写和小写
- python - 将屏幕管理器小部件与 MapView 集成