首页 > 解决方案 > spring-kafka Multiple @KafkaListener 具有不同的 Json 对象不起作用

问题描述

我有一个工厂,这对于我的四个不同的@KafkaListers 来说很常见,并且每个侦听器都应该从其各自的主题中使用自己的 JSON 对象。我无法让它工作,因为我得到异常说:他的类'com.abc.MyObject'不在受信任的包中:[java.util,java.lang]。如果您认为此类可以安全反序列化,请提供其名称。如果序列化仅由受信任的来源完成,您还可以启用全部信任 (*)。即使我在下面添加,我仍然得到相同的异常: config.put(JsonDeserializer.TRUSTED_PACKAGES, ""); 如果我不使用通用工厂并将其与一个特定对象一起使用,它对于该对象工作正常,但那样我将不得不为四个不同的 Kafkalisteners 创建四个工厂。

我的卡夫卡列表:

@KafkaListener( topics="number_1_event", groupId="abc-group", containerFactory="kafkaABCListenerContainerFactory")
public void consumeMyMessage(MyTopic1Class data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
            @Header(KafkaHeaders.OFFSET) int offsetId) {
                // code here
            }



@KafkaListener( topics="number_2_event", groupId="abc2-group", containerFactory="kafkaABCListenerContainerFactory")
/* Similar method signature here*/

@KafkaListener( topics="number_3_event", groupId="abc3-group", containerFactory="kafkaABCListenerContainerFactory")

@KafkaListener( topics="number_4_event", groupId="abc4-group", containerFactory="kafkaABCListenerContainerFactory")

配置:

@Bean
public ConsumerFactory<String, Object> abcdConsumerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");


    return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
            new JsonDeserializer<>(Object.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaABCListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();      
    factory.setConsumerFactory(abcdConsumerFactory());

    return factory;
    }

我目前正在使用上述配置,并且如上所述出现信任包错误。如果我用 MyTopic1Class 或 MyTopic2Class 替换上述配置中的 Object,然后运行它,它将适用于该特定对象。请帮忙 !!

标签: javaspring-bootapache-kafkaspring-kafka

解决方案


你的意思是我可以使用ConcurrentKafkaListenerContainerFactory<String, MyTopic1Class>and ConsumerFactory<String, MyTopic1Class> abcdConsumerFactory()。当我的 number_1_event 主题收到 MyTopic1Class 时,此工厂签名将起作用。当主题 number_2_event 收到 MyTopic2Class 并使用相同的工厂签名时它会起作用吗

是的,它会工作,但使用起来更清洁

ConcurrentKafkaListenerContainerFactory<String, Object> abcdContainerFactory()
ConsumerFactory<String, Object> abcdConsumerFactory()

您的听众可以消费更窄的类型MyTopic1ClassMyTopic2Class

工厂的泛型类型只是编译时的;它们在运行时被擦除。


推荐阅读