apache-kafka - 在 KStream DSL 中使用转换时出现 Avro java.io.EOFException
问题描述
我需要在 KStream 中使用 transform() 操作,但是在未设置必要的 serdes 时会出现通常的 ClassNotFoundException:
Caused by: java.lang.ClassCastException: xxx.SomeKey cannot be cast to [B
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:161)
val someKeySerde = SpecificAvroSerde<SomeKeySerde>()
someKeySerde.configure(serdeConfig, false)
val someValueSerde = SpecificAvroSerde<SomeValueSerde>()
someValueSerde.configure(serdeConfig, false)
val someExtendedValueSerde = SpecificAvroSerde<SomeExtendedValueSerde>()
someExtendedValueSerde.configure(serdeConfig, false)
myKStream
.transform(TransformerSupplier {
object : Transformer<SomeKey, SomeValue, KeyValue<SomeKey, SomeValue>> {
private lateinit var context: ProcessorContext
override fun close() {
}
override fun transform(key: SomeKey, value: SomeValue): KeyValue<SomeKey, SomeValue> {
println("@@@@@@@@@@@@ timestamp ${context.timestamp()}")
...
return KeyValue(key, enrichedValue)
}
override fun init(context: ProcessorContext) {
this.context = context
}
}
}).groupByKey()
.aggregate(getSomeValueAggregationInitializer(),
getAggregator("absolute"),
materializedAbsoluteSomeValueFrequency)
其他 KStream 操作允许我们指定要使用的 serdes,但不是在转换的情况下。我该如何设置它们?(如您在上面看到的,SpecificAvroSerde)?
更新:正如 Matthias 所指出的,问题是在转换之后的 groupByKey 操作中缺少 Serdes。我已经用新问题更新了问题标题。
1)为什么没有Grouped.with(clientProjectIdSerde, deploymentFinishedSerde)
transform()调用但我需要在transform()之后添加它时它可以工作?
如果我更新 groupById 以包含 GroupedWith ->.groupByKey(Grouped.with(clientProjectIdSerde, deploymentFinishedSerde))
现在我可以看到通话记录("@@@@@@@@@@@@ ...
,但出现了一个新问题:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:160)
at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:184)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
如果我在没有它的情况下留下完全相同的代码.transform()
:
myKStream
.groupByKey(Grouped.with(clientProjectIdSerde, deploymentFinishedSerde))
.aggregate(getSomeValueAggregationInitializer(),
getAggregator("absolute"),
materializedAbsoluteSomeValueFrequency)
2) 为什么我在使用 transform() 时会出现这个 Avro 异常,但如果没有它,我该如何解决?
从 SomeValue Avro 对象读取 dateTime long 字段时会发生错误:
{
"namespace" : "xxx",
"type" : "record",
"name" : "SomeValue",
"fields" : [
{"name": "name", "type": "string"},
{"name":"dateTime", "type": "long", "logicalType": "timestamp-millis"}
]
}
解决方案
Matthias 在评论中提供了第一个问题的答案。
关于第二个,问题出现的实际场景是在测试期间。与普通的 Kafka Broker + Schema Registry 相比,它运行良好。
问题出在io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
我在测试期间使用的类中。
这是模式注册的流程:
1) 首先,它从输入主题中读取一条记录,并为 SomeKey 和 id 2 注册了一个模式。
2)后transform()
处理逻辑使用了一个repartition topic。它尝试序列化密钥。为此,该MockSchemaRegistryClient.getIdFromRegistry()
方法为键和值的模式生成了错误的 id (-1)。然后,当它尝试序列化密钥时,它使用了 -1 id,该 id 首先分配给了密钥,但后来覆盖了值模式。因此,它试图用值的模式序列化键。这就是 Avro 异常的原因。
我正在使用 io.confluent:kafka-schema-registry-client:5.2.1。升级到 5.2.2 问题就消失了。这是修复的提交:https ://github.com/confluentinc/schema-registry/commit/6ef5d4a523a5eedff0fa32bea1e1405be42efc13#diff-e5caaf947bc9ff275003783d5d50eee6R90