scala - Flink Avro 状态演变
问题描述
我有一个在 CoProcessFunction 中使用 ListState[MyAvroClass] 的 flink 拓扑。
在我的 CoProcessFunction 中,我初始化
var myState: ListState[MyAvroClass] = _
override def open(parameters: Configuration): Unit = {
val avroSerializer = new AvroSerializer[MyAvroClass](classOf[MyAvroClass])
myState = getRuntimeContext.getListState[MyAvroClass](new ListStateDescriptor[MyAvroClass]("myState", avroSerializer))
}
它运行正常,但我无法处理 avro 模式演变。即使我从保存点重新开始我的工作,我也有反序列化数据的问题,例如当我遍历 listState 迭代器时。
我一直有错误:
java.lang.ArrayIndexOutOfBoundsException: 738197505
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:201)
at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:147)
at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
似乎他正在尝试获取我的 avro 类的先前版本中不存在的字段。知道如何解决这个问题吗?
解决方案
推荐阅读
- linq - 字符串替换所有与字符串列表匹配的子字符串 C# LINQ
- sql - 根据不同的条件对不同表的列进行计数,并通过公共列连接 (SQL) 进行分组
- c++ - 函数采用指向方法的指针,而不考虑 constness
- javascript - ReactJS how do i save input file multiple image base64
- node.js - 在这种情况下,以下 npm 文档中的“全局运行 npm 包”是什么意思?
- log4j2 - 带有控制台的 log4j2 xml 配置
- c - 使用 O_CREAT O_WRONLY :如何获取文件中的当前位置并在末尾附加数据?
- javascript - 如何在使用react的useeffect返回语句中使用react.usecallback?
- c# - 如何在 C# 中使用 restapi 从 github 为用户(人)获取数据?
- java - 如何根据页眉/页脚高度动态调整页边距