首页 > 解决方案 > Flink Avro 状态演变

问题描述

我有一个在 CoProcessFunction 中使用 ListState[MyAvroClass] 的 flink 拓扑。

在我的 CoProcessFunction 中,我初始化

var myState: ListState[MyAvroClass] = _

override def open(parameters: Configuration): Unit = {
  val avroSerializer = new AvroSerializer[MyAvroClass](classOf[MyAvroClass])
  myState = getRuntimeContext.getListState[MyAvroClass](new ListStateDescriptor[MyAvroClass]("myState", avroSerializer))
}

它运行正常,但我无法处理 avro 模式演变。即使我从保存点重新开始我的工作,我也有反序列化数据的问题,例如当我遍历 listState 迭代器时。

我一直有错误:

java.lang.ArrayIndexOutOfBoundsException: 738197505
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.flink.formats.avro.typeutils.AvroSerializer.deserialize(AvroSerializer.java:201)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:147)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)

似乎他正在尝试获取我的 avro 类的先前版本中不存在的字段。知道如何解决这个问题吗?

标签: scalaapache-kafkaapache-flinkavroflink-streaming

解决方案


推荐阅读