首页 > 解决方案 > 如何修复 org.apache.avro.message.BadHeaderException: Unrecognized header bytes: in Java Kafka consumer?

问题描述

我正在为 Kafka 消费者实现 Avro 反序列化器。但是,当我运行消费者时,我在反序列化值时收到错误消息,指出存在无法识别的标头字节。

首先,我决定检查导致错误的相关记录。我在为 kafka 使用 3rd 方可视化工具时做到了这一点。我可以看到记录是正确的,并且第 3 方工具可以正确地使用来自主题的 Avro 消息。所以这告诉我问题可能与我的反序列化器的实现有关。

这是我对 Deserializer 类的实现

public class UserRecordDeserializer implements Deserializer<userRecord> {
    ...
    @Override
    public userRecord deserialize(String topic, byte[] data){
        if (data == null){
            return null;
        }

        try {
            ByteBuffer userRecordBuffer = ByteBuffer.wrap(data);
            userRecord newRecord = decodeUserRecord(userRecordBuffer);
            return newRecord;
        } catch (IOException | RuntimeException e) {
            throw new RuntimeException("Error deserializing value", e);
        }

    }

    private userRecord decodeUserRecord(java.nio.ByteBuffer data) throws IOException{
        try {
            userRecord temp = new userRecord();
            return temp.fromByteBuffer(data);
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
}

这是 Avro 生成类的 DECODER 和 fromByteBuffer 方法。


  private static final BinaryMessageDecoder<userRecord> DECODER =
      new BinaryMessageDecoder<userRecord>(MODEL$, SCHEMA$);

  public static userRecord fromByteBuffer(
      java.nio.ByteBuffer b) throws java.io.IOException {
    return DECODER.decode(b);
  }

Caused by: java.lang.RuntimeException: Error deserializing value
    at com.package.serdes.UserRecordDeserializer.deserialize(UserRecordDeserializer.java:30)
    at com.package.UserRecordDeserializer.deserialize(UserRecordDeserializer.java:11)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1030)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1250)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
    at com.package.consumer.ActivitiesKafkaSimpleConsumerImpl.run(ActivitiesKafkaSimpleConsumerImpl.java:25)
    at com.package.RunConsumer.main(RunConsumer.java:11)
Caused by: org.apache.avro.message.BadHeaderException: Unrecognized header bytes: 0x00 0x00
    at org.apache.avro.message.BinaryMessageDecoder.decode(BinaryMessageDecoder.java:147)
    at org.apache.avro.message.MessageDecoder$BaseDecoder.decode(MessageDecoder.java:139)
    at org.apache.avro.message.MessageDecoder$BaseDecoder.decode(MessageDecoder.java:127)
    at com.package.models.userRecord.fromByteBuffer(userRecord.java:71)
    at com.package.serdes.UserRecordDeserializer.decodeUserRecord(UserRecordDeserializer.java:42)
    at com.package.serdes.UserRecordDeserializer.deserialize(UserRecordDeserializer.java:27)
    ... 13 more

标签: javaapache-kafkaavro

解决方案


推荐阅读