avro - Spring Cloud Stream kafka avro反序列化
问题描述
我正在编写一个使用 Avro 消息的 Spring Cloud Stream Sink 应用程序。我试图让应用程序使用这些消息,但是我收到以下错误。
org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.avro.generic.GenericRecord] for GenericMessage [payload=byte[304], headers={kafka_offset=42898134, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74f810ba, deliveryAttempt=3, kafka_timestampType=NO_TIMESTAMP_TYPE, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=bam_hrapi, kafka_receivedTimestamp=-1, contentType=avro/bytes}], failedMessage=GenericMessage [payload=byte[304], headers={kafka_offset=42898134, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74f810ba, deliveryAttempt=3, kafka_timestampType=NO_TIMESTAMP_TYPE, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=*****, kafka_receivedTimestamp=-1, contentType=avro/bytes}]
我尝试阅读 spring 参考文档并相应地配置了应用程序。我尝试了各种配置,但仍然出现错误。
下面是我的应用程序配置
@SpringBootApplication
@EnableBinding(Processor.class)
public class UserApp {
public static void main(String[] args) {
SpringApplication.run(UserApp.class, args);
}
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/MapsEvent.avsc"));
return converter;
}
}
和我的流监听器如下
@Component
@Slf4j
public class UserAdditionEventListener {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handleEvents(GenericRecord input) {
System.out.println(input);
return "hello";
}
}
下面是我的 application.properties 文件
spring.cloud.stream.bindings.input.destination=user_topic
spring.cloud.stream.bindings.output.destination=user_output_topic
spring.cloud.stream.kafka.binder.brokers=dd-kafa-100
spring.cloud.stream.default.consumer.headerMode=raw
spring.cloud.stream.bindings.input.content-type=avro/bytes
解决方案
推荐阅读
- java - Map.contains 与 Map.values().stream().filter.find
- laravel - Symfony \ Component \ Debug \ Exception \ FatalThrowableError(E_ERROR)类'App \ Providers \ View'未找到
- c# - $Expand 使用实体框架在 Webapi 可查询端点上失败
- scala - 如何在火花数据框中用\ N替换空字符串
- swift - 如何使用递归正确退出 House Robber 实施?
- python - Django 测试客户端获取返回 404 但是在 shell 和 Web 浏览器中工作
- javascript - Javascript sup() 不工作
- r - 在 s3 对象上调度 s4 方法
- typescript - ShouldJs Typescript 与 Promise 有问题
- css - 悬停和鼠标速度上的颜色过渡问题