scala - 无法在 scala 中访问反序列化的嵌套 avro 通用记录元素
问题描述
我正在使用结构化流(Spark 2.4.0)通过 kafka 读取 avro 消息并使用 Confluent schema-Registry 接收/读取模式
我无法访问深度嵌套的字段。
Schema 在压缩的 avsc 格式中如下所示:
{"type":"record","name":"KafkaMessage","namespace":"avro.pojo","fields":[{"name":"context","type":["null",{"type":"record","name":"Context","fields":[{"name":"businessInteractionId","type":["null","string"]},{"name":"referenceNumber","type":["null","string"]},{"name":"serviceName","type":["null","string"]},{"name":"status","type":["null","string"]},{"name":"sourceSystems","type":["null",{"type":"array","items":{"type":"record","name":"SourceSystem","fields":[{"name":"orderId","type":["null","string"]},{"name":"revisionNumber","type":["null","string"]},{"name":"systemId","type":["null","string"]}]}}]},{"name":"sysDate","type":["null","string"]}]}]}]}
在火花中解析
context
|-- businessInteractionId: string (nullable = true)
|-- referenceNumber: string (nullable = true)
|-- serviceName: string (nullable = true)
|-- sourceSystems: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- orderId: string (nullable = true)
| | |-- revisionNumber: string (nullable = true)
| | |-- systemId: string (nullable = true)
|-- status: string (nullable = true)
|-- sysDate: string (nullable = true)
我的方法:将返回的对象转换为 GenericRecord,将数组转换为 GenericData.Array[GenericRecord] Link
代码
val client = new CachedSchemaRegistryClient(schemaRegUrl, 100)
val brdDeser = spark.sparkContext.broadcast(new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]])
val results = df.select(col("value").as[Array[Byte]]).map {
rawBytes: Array[Byte] =>
//read the raw bytes from spark and then use the confluent deserializer to get the record back
val deser = brdDeser.value
val decoded = deser.deserialize(topics, rawBytes)
val context_GR =
decoded.get("context").asInstanceOf[GenericRecord]
val c_businessInteractionId =
context_GR.get("businessInteractionId").toString //this works
val c1_sourceSystems =
context_GR
.get("sourceSystems")
.asInstanceOf[GenericData.Array[GenericRecord]]
val c_orderId = c1_sourceSystems.get(0).get("orderId").toString //NullPointerException
val c_revisionNumber = c1_sourceSystems.get(0).get("revisionNumber").toString
val c_systemId = c1_sourceSystems.get(0).get("systemId").toString
new CaseMessage(
c_businessInteractionId, c_orderId, c_revisionNumber, c_systemId )
}
case class CaseMessage(c_businessInteractionId: String,
c_orderId: String,
c_revisionNumber: String,
c_systemId: String,)
每次我在尝试评估 c_orderId 时收到 java.lang.NullPointerException
解决方案
这是一个数据问题。我能够通过执行空值检查来解决这个问题
val c_orderId = if (c1_sourceSystems.get(0).get("orderId") != null) {
c1_sourceSystems.get(0).get("orderId").toString
推荐阅读
- python - python - 如何使用python中txt文件中的xy数据绘制粒子的轨迹?
- networking - 网络地址和主机地址?
- css - Yanone Kaffeesatz 上的中风使它看起来支离破碎
- c++ - 有人可以告诉我为什么会收到此错误消息吗?内联 constexpr 变量
- android - 在 Android 模拟器中运行时无法更改 Chrome 自定义选项卡导航栏颜色
- mysql - 如何检查我与 MySQL 的连接是否使用 SSH?
- php - Laravel 权限被拒绝(数据库)
- macos - 以前在mac上运行的备份驱动器,突然是只读的?
- kdb - 使用 apply (@) 在 kdb 中手动创建符号向量
- flutter - Flutter:在无状态小部件中创建动画控制器