apache-spark - Spark Kryo 序列化
问题描述
我们有一个 Spark Structured Streaming 应用程序,它使用 Avro 格式的 Kafka 主题。有效负载是函数中state
对象的一部分mapGroupWithState
。鉴于我们强制FULL
对 Avro 模式进行兼容性,我们在发展模式时通常不会遇到问题。然而,我们现在已经通过添加嵌套对象来改进我们的模式,并且 Kryo 序列化失败并出现以下内容,xyz
该字段是其中的嵌套对象ObjectV1
:
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 020-12-14T19:23:49
Serialization trace:
xyz (x.y.z.ObjectV1)
Logical Plan:
SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#38]
+- MapPartitions <function1>, obj#37: scala.Tuple2
+- DeserializeToObject decodeusingserializer(cast(value#34 as binary), scala.Option, true), obj#36: scala.Option
+- SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#34]
+- FlatMapGroupsWithState <function3>, newInstance(class scala.Tuple4), decodeusingserializer(cast(value#23 as binary), scala.Tuple2, true), [_1#29, _2#30, _3#31, _4#32L], [value#23], obj#33: scala.Option, class[value[0]: binary], Update, true, ProcessingTimeTimeout
+- AppendColumns <function1>, class scala.Tuple2, [StructField(value,BinaryType,true)], decodeusingserializer(cast(value#23 as binary), scala.Tuple2, true), [assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._1 AS _1#29, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._2, true, false) AS _2#30, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._3 AS _3#31, assertnotnull(assertnotnull(input[0, scala.Tuple4, true]))._4 AS _4#32L]
+- SerializeFromObject [encodeusingserializer(input[0, java.lang.Object, true], true) AS value#23]
+- MapElements <function1>, interface org.apache.spark.sql.Row, [StructField(key,BinaryType,true), StructField(value,BinaryType,true), StructField(topic,StringType,true), StructField(partition,IntegerType,true), StructField(offset,LongType,true), StructField(timestamp,TimestampType,true), StructField(timestampType,IntegerType,true)], obj#22: scala.Tuple2
+- DeserializeToObject createexternalrow(key#7, value#8, topic#9.toString, partition#10, offset#11L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#12, true, false), timestampType#13, StructField(key,BinaryType,true), StructField(value,BinaryType,true), StructField(topic,StringType,true), StructField(partition,IntegerType,true), StructField(offset,LongType,true), StructField(timestamp,TimestampType,true), StructField(timestampType,IntegerType,true)), obj#21: org.apache.spark.sql.Row
+- StreamingExecutionRelation KafkaV2[Subscribe[ext_object_v1]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
Spark 版本是 2.4.5。有没有人遇到过类似的事情?删除检查点文件夹可以解决这个问题,但我们自然希望避免这种情况。
解决方案
推荐阅读
- c# - 是否有消息“抱歉,好像出了点问题”。使用收据卡时在频道 Line 和 Messengers 上
- python - 在本地 PyCharm 中运行 dev_appserver.py 时断点不起作用
- r - 聚合具有重叠日期范围的行的列值
- javascript - Vue修改DOM时如何调用函数?
- json - Flutter:setState 在第一次尝试时工作,但在响应 = 等待后没有工作
- pandas - 如何编写与 cutoff_time 一起使用的 seed_features
- vba - 在 MS Project 中使用 VBA 更改字段中的值
- python - 如何以“2019 年 8 月 21 日星期三 8:13 PM”格式打印日期时间?
- python - 更新 - 在找到标准后丢弃行时在熊猫中分组数据框
- angular - 异步管道不适用于相同的 observable