首页 > 解决方案 > 无法在 scala 中访问反序列化的嵌套 avro 通用记录元素

问题描述

我正在使用结构化流(Spark 2.4.0)通过 kafka 读取 avro 消息并使用 Confluent schema-Registry 接收/读取模式

我无法访问深度嵌套的字段。

Schema 在压缩的 avsc 格式中如下所示:

{"type":"record","name":"KafkaMessage","namespace":"avro.pojo","fields":[{"name":"context","type":["null",{"type":"record","name":"Context","fields":[{"name":"businessInteractionId","type":["null","string"]},{"name":"referenceNumber","type":["null","string"]},{"name":"serviceName","type":["null","string"]},{"name":"status","type":["null","string"]},{"name":"sourceSystems","type":["null",{"type":"array","items":{"type":"record","name":"SourceSystem","fields":[{"name":"orderId","type":["null","string"]},{"name":"revisionNumber","type":["null","string"]},{"name":"systemId","type":["null","string"]}]}}]},{"name":"sysDate","type":["null","string"]}]}]}]}

在火花中解析

      context
     |-- businessInteractionId: string (nullable = true)
     |-- referenceNumber: string (nullable = true)
     |-- serviceName: string (nullable = true)
     |-- sourceSystems: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- orderId: string (nullable = true)
     |    |    |-- revisionNumber: string (nullable = true)
     |    |    |-- systemId: string (nullable = true)
     |-- status: string (nullable = true)
     |-- sysDate: string (nullable = true)

我的方法:将返回的对象转换为 GenericRecord,将数组转换为 GenericData.Array[GenericRecord] Link

代码

val client = new CachedSchemaRegistryClient(schemaRegUrl, 100)
val brdDeser = spark.sparkContext.broadcast(new KafkaAvroDeserializer(client).asInstanceOf[Deserializer[GenericRecord]])

    val results = df.select(col("value").as[Array[Byte]]).map {
      rawBytes: Array[Byte] =>
        //read the raw bytes from spark and then use the confluent deserializer to get the record back
        val deser = brdDeser.value
        val decoded = deser.deserialize(topics, rawBytes)
        val context_GR =
          decoded.get("context").asInstanceOf[GenericRecord]
        val c_businessInteractionId =
          context_GR.get("businessInteractionId").toString  //this works
        val c1_sourceSystems =
          context_GR
            .get("sourceSystems")
            .asInstanceOf[GenericData.Array[GenericRecord]]
        val c_orderId = c1_sourceSystems.get(0).get("orderId").toString   //NullPointerException
        val c_revisionNumber = c1_sourceSystems.get(0).get("revisionNumber").toString
        val c_systemId = c1_sourceSystems.get(0).get("systemId").toString
new CaseMessage(
          c_businessInteractionId, c_orderId, c_revisionNumber, c_systemId )
    }

case class CaseMessage(c_businessInteractionId: String,
                         c_orderId: String,
                         c_revisionNumber: String,
                         c_systemId: String,)

每次我在尝试评估 c_orderId 时收到 java.lang.NullPointerException

标签: scalaspark-streamingavroconfluent-schema-registryspark-avro

解决方案


这是一个数据问题。我能够通过执行空值检查来解决这个问题

val c_orderId = if (c1_sourceSystems.get(0).get("orderId") != null) {
          c1_sourceSystems.get(0).get("orderId").toString

推荐阅读