java - 如何修复 org.apache.avro.message.BadHeaderException: Unrecognized header bytes: in Java Kafka consumer?
问题描述
我正在为 Kafka 消费者实现 Avro 反序列化器。但是,当我运行消费者时,我在反序列化值时收到错误消息,指出存在无法识别的标头字节。
首先,我决定检查导致错误的相关记录。我在为 kafka 使用 3rd 方可视化工具时做到了这一点。我可以看到记录是正确的,并且第 3 方工具可以正确地使用来自主题的 Avro 消息。所以这告诉我问题可能与我的反序列化器的实现有关。
这是我对 Deserializer 类的实现
public class UserRecordDeserializer implements Deserializer<userRecord> {
...
@Override
public userRecord deserialize(String topic, byte[] data){
if (data == null){
return null;
}
try {
ByteBuffer userRecordBuffer = ByteBuffer.wrap(data);
userRecord newRecord = decodeUserRecord(userRecordBuffer);
return newRecord;
} catch (IOException | RuntimeException e) {
throw new RuntimeException("Error deserializing value", e);
}
}
private userRecord decodeUserRecord(java.nio.ByteBuffer data) throws IOException{
try {
userRecord temp = new userRecord();
return temp.fromByteBuffer(data);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
这是 Avro 生成类的 DECODER 和 fromByteBuffer 方法。
private static final BinaryMessageDecoder<userRecord> DECODER =
new BinaryMessageDecoder<userRecord>(MODEL$, SCHEMA$);
public static userRecord fromByteBuffer(
java.nio.ByteBuffer b) throws java.io.IOException {
return DECODER.decode(b);
}
Caused by: java.lang.RuntimeException: Error deserializing value
at com.package.serdes.UserRecordDeserializer.deserialize(UserRecordDeserializer.java:30)
at com.package.UserRecordDeserializer.deserialize(UserRecordDeserializer.java:11)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1030)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1250)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1099)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:545)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:506)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1269)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
at com.package.consumer.ActivitiesKafkaSimpleConsumerImpl.run(ActivitiesKafkaSimpleConsumerImpl.java:25)
at com.package.RunConsumer.main(RunConsumer.java:11)
Caused by: org.apache.avro.message.BadHeaderException: Unrecognized header bytes: 0x00 0x00
at org.apache.avro.message.BinaryMessageDecoder.decode(BinaryMessageDecoder.java:147)
at org.apache.avro.message.MessageDecoder$BaseDecoder.decode(MessageDecoder.java:139)
at org.apache.avro.message.MessageDecoder$BaseDecoder.decode(MessageDecoder.java:127)
at com.package.models.userRecord.fromByteBuffer(userRecord.java:71)
at com.package.serdes.UserRecordDeserializer.decodeUserRecord(UserRecordDeserializer.java:42)
at com.package.serdes.UserRecordDeserializer.deserialize(UserRecordDeserializer.java:27)
... 13 more
解决方案
推荐阅读
- javascript - 带有有效负载的 axios 删除方法
- kotlin - Kotlin 下载并显示 PDF
- python - python lambda函数中的导入模块
- android - ARCore transformCoordinates2d() 从视图到 CPU 图像的结果?
- c - 如何将字符串传递给 web 服务器上的 test.cgi?
- javascript - 尽管 sql.close() 已经存在全局连接
- sql - 将聚合函数 SUM 与列相乘?
- typescript - 无法在泛型类型上使用 Partial
- sql - Postgres多个列上的多个谓词
- javascript - 故事书自定义字体未显示