java - 使用 Spring Boot 的 Kafka avro 消费者的序列化错误
问题描述
我Kafka Avro
使用 spring boot 作为两个不同的项目创建了一个生产者和消费者。在使用数据时,我收到以下异常。
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value
for partition bookavro-0 at offset 3. If needed, please seek past the record to continue
consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message
for id 1
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class
com.dailycodebuffer.kafka.apachekafkaproducerdemo.BookAvro specified in writer's schema whilst
finding reader's schema for a SpecificRecord.
2020-12-30 18:44:09.032 ERROR 22344 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer
: Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's
directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145) ~[spring-kafka-2.6.4.jar:2.6.4]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.6.4.jar:2.6.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1425) [spring-kafka-2.6.4.jar:2.6.4]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1122) [spring-kafka-2.6.4.jar:2.6.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_202]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_202]
at java.lang.Thread.run(Thread.java:813) [na:1.8.0_202]
com.dailycodebuffer.kafka.apachekafkaproducerdemo.BookAvro
是生产者项目中的包
以下是我的消费者配置:
@Bean
public ConsumerFactory<String, BookAvro> BookconsumerFactory(){
System.out.println("hi");
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// configProps.put(ConsumerConfig.KEY, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"io.confluent.kafka.serializers.KafkaAvroDeserializer");
// configProps.put("value.deserializer","org.springframework.kafka.support.serializer.JsonDeserializer");
// configProps.put(JsonDeserializer.ADD_TYPE_INFO_HEADERS, false);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG,"group_json");
configProps.put("auto.offset.reset", "earliest");
configProps.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
configProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
System.out.println(configProps.toString());
return new DefaultKafkaConsumerFactory<String, BookAvro>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, BookAvro> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, BookAvro> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(BookconsumerFactory());
System.out.println(factory.toString());
//factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
以下是生产者配置:
@Bean
public ProducerFactory<String, BookAvro> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaAvroSerializer.class.getName());
configProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
// configProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
// configProps.put(KafkaAvroSerializerConfig., "true");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, BookAvro> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
下面是 Kafka 监听器:
@KafkaListener(groupId = "group_json", topics = "bookavro")
public void consumeBook( BookAvro book) {
System.out.println("message3" + book.toString());
}
BookAvro 是使用 Avsc 文件创建的 Avro 类。谁能帮我解决这个异常?
解决方案
推荐阅读
- java - 无法在android studio中将数据插入数据库
- css - React Native - 图像半圆(使用 CSS)
- elasticsearch - elasticsearch 仅匹配日期字段的日期
- mysql - 如何计算分组数据的时间差(MYSQL 5.6)
- mysql - MariaDB如何避免错误NOT NULL DEFAULT
- java - 如何为H2数据库覆盖oracle的LISTAGG函数
- ios - 快速将数据从项目传输到小部件
- sql - SQL查询中的左连接问题
- c# - 带搜索选项的多选列表框
- powershell - Powershell 'mystring' 与 Write-Host 'mystring'