首页 > 解决方案 > Spark Kryo 序列化

问题描述

我们有一个 Spark Structured Streaming 应用程序,它使用 Avro 格式的 Kafka 主题。有效负载是函数中state对象的一部分mapGroupWithState。鉴于我们强制FULL对 Avro 模式进行兼容性,我们在发展模式时通常不会遇到问题。然而,我们现在已经通过添加嵌套对象来改进我们的模式,并且 Kryo 序列化失败并出现以下内容,xyz该字段是其中的嵌套对象ObjectV1

Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 020-12-14T19:23:49
Serialization trace:
xyz (x.y.z.ObjectV1)
Logical Plan:
SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#38]
+- MapPartitions <function1>, obj#37: scala.Tuple2
   +- DeserializeToObject decodeusingserializer(cast(value#34 as binary), scala.Option, true), obj#36: scala.Option
      +- SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#34]
         +- FlatMapGroupsWithState <function3>, newInstance(class scala.Tuple4), decodeusingserializer(cast(value#23 as binary), scala.Tuple2, true), [_1#29, _2#30, _3#31, _4#32L], [value#23], obj#33: scala.Option, class[value[0]: binary], Update, true, ProcessingTimeTimeout
            +- AppendColumns <function1>, class scala.Tuple2, [StructField(value,BinaryType,true)], decodeusingserializer(cast(value#23 as binary), scala.Tuple2, true), [assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._1 AS _1#29, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._2, true, false) AS _2#30, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._3 AS _3#31, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._4 AS _4#32L]
               +- SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#23]
                  +- MapElements <function1>, interface org.apache.spark.sql.Row, [StructField(key,BinaryType,true), StructField(value,BinaryType,true), StructField(topic,StringType,true), StructField(partition,IntegerType,true), StructField(offset,LongType,true), StructField(timestamp,TimestampType,true), StructField(timestampType,IntegerType,true)], obj#22: scala.Tuple2
                     +- DeserializeToObject createexternalrow(key#7, value#8, topic#9.toString, partition#10, offset#11L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#12, true, false), timestampType#13, StructField(key,BinaryType,true), StructField(value,BinaryType,true), StructField(topic,StringType,true), StructField(partition,IntegerType,true), StructField(offset,LongType,true), StructField(timestamp,TimestampType,true), StructField(timestampType,IntegerType,true)), obj#21: org.apache.spark.sql.Row
                        +- StreamingExecutionRelation KafkaV2[Subscribe[ext_object_v1]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

Spark 版本是 2.4.5。有没有人遇到过类似的事情?删除检查点文件夹可以解决这个问题,但我们自然希望避免这种情况。

标签: apache-sparkspark-streamingavroapache-spark-2.0kryo

解决方案


推荐阅读