首页 > 解决方案 > 在 KStream DSL 中使用转换时出现 Avro java.io.EOFException

问题描述

我需要在 KStream 中使用 transform() 操作,但是在未设置必要的 serdes 时会出现通常的 ClassNotFoundException:

Caused by: java.lang.ClassCastException: xxx.SomeKey cannot be cast to [B
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
val someKeySerde = SpecificAvroSerde<SomeKeySerde>()
someKeySerde.configure(serdeConfig, false)
val someValueSerde = SpecificAvroSerde<SomeValueSerde>()
someValueSerde.configure(serdeConfig, false)
val someExtendedValueSerde = SpecificAvroSerde<SomeExtendedValueSerde>()
someExtendedValueSerde.configure(serdeConfig, false)

myKStream
    .transform(TransformerSupplier {
        object : Transformer<SomeKey, SomeValue, KeyValue<SomeKey, SomeValue>> {

            private lateinit var context: ProcessorContext

            override fun close() {
            }

            override fun transform(key: SomeKey, value: SomeValue): KeyValue<SomeKey, SomeValue> {
                println("@@@@@@@@@@@@ timestamp ${context.timestamp()}")
                ...   
                return KeyValue(key, enrichedValue)
            }

            override fun init(context: ProcessorContext) {
                this.context = context
            }

        }
    }).groupByKey()
      .aggregate(getSomeValueAggregationInitializer(),
                        getAggregator("absolute"),
                        materializedAbsoluteSomeValueFrequency)

其他 KStream 操作允许我们指定要使用的 serdes,但不是在转换的情况下。我该如何设置它们?(如您在上面看到的,SpecificAvroSerde)?

更新:正如 Matthias 所指出的,问题是在转换之后的 groupByKey 操作中缺少 Serdes。我已经用新问题更新了问题标题。

1)为什么没有Grouped.with(clientProjectIdSerde, deploymentFinishedSerde)transform()调用但我需要在transform()之后添加它时它可以工作?

如果我更新 groupById 以包含 GroupedWith ->.groupByKey(Grouped.with(clientProjectIdSerde, deploymentFinishedSerde))现在我可以看到通话记录("@@@@@@@@@@@@ ...,但出现了一个新问题:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: java.io.EOFException
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
    at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:160)
    at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:184)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)

如果我在没有它的情况下留下完全相同的代码.transform()

myKStream
    .groupByKey(Grouped.with(clientProjectIdSerde, deploymentFinishedSerde))
      .aggregate(getSomeValueAggregationInitializer(),
                        getAggregator("absolute"),
                        materializedAbsoluteSomeValueFrequency)

2) 为什么我在使用 transform() 时会出现这个 Avro 异常,但如果没有它,我该如何解决?

从 SomeValue Avro 对象读取 dateTime long 字段时会发生错误:

{
  "namespace" : "xxx",
  "type" : "record",
  "name" : "SomeValue",
  "fields" : [
    {"name": "name", "type": "string"},
    {"name":"dateTime", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

标签: apache-kafkaavroapache-kafka-streams

解决方案


Matthias 在评论中提供了第一个问题的答案。

关于第二个,问题出现的实际场景是在测试期间。与普通的 Kafka Broker + Schema Registry 相比,它运行良好。

问题出在io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient我在测试期间使用的类中。

这是模式注册的流程:

1) 首先,它从输入主题中读取一条记录,并为 SomeKey 和 id 2 注册了一个模式。

2)后transform()处理逻辑使用了一个repartition topic。它尝试序列化密钥。为此,该MockSchemaRegistryClient.getIdFromRegistry()方法为键和值的模式生成了错误的 id (-1)。然后,当它尝试序列化密钥时,它使用了 -1 id,该 id 首先分配给了密钥,但后来覆盖了值模式。因此,它试图用值的模式序列化键。这就是 Avro 异常的原因。

我正在使用 io.confluent:kafka-schema-registry-client:5.2.1。升级到 5.2.2 问题就消失了。这是修复的提交:https ://github.com/confluentinc/schema-registry/commit/6ef5d4a523a5eedff0fa32bea1e1405be42efc13#diff-e5caaf947bc9ff275003783d5d50eee6R90


推荐阅读