首页 > 解决方案 > 如何从spring-cloud-stream-kafka-stream中的avro模式名称重定向类名?

问题描述

我有一个消费 Kafka 主题的消费者应用程序。当生产者通过 Avro 发布类类型的记录时,它会按照模式注册表上的模式Reservation分配模式名称。my_reservations

在我的消费者应用程序中,我将配置设置为

default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

和处理器作为

@Bean
    public java.util.function.Consumer<KStream<String, Reservation>> process() {

        return input.foreach((key, value) -> {
                    System.out.println(" Value: " + value);
                });
    }

但是我从消费者那里得到了错误,因为Reservation类路径是reservation.Reservation.

Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class reservation.my_reservations specified in writer's schema whilst finding reader's schema for a SpecificRecord.

我该如何解决?我正在使用官方文档中提到的功能方式来创建消费者。

标签: javaapache-kafkaspring-cloud-stream

解决方案


推荐阅读