java - spring-kafka Multiple @KafkaListener 具有不同的 Json 对象不起作用
问题描述
我有一个工厂,这对于我的四个不同的@KafkaListers 来说很常见,并且每个侦听器都应该从其各自的主题中使用自己的 JSON 对象。我无法让它工作,因为我得到异常说:他的类'com.abc.MyObject'不在受信任的包中:[java.util,java.lang]。如果您认为此类可以安全反序列化,请提供其名称。如果序列化仅由受信任的来源完成,您还可以启用全部信任 (*)。即使我在下面添加,我仍然得到相同的异常: config.put(JsonDeserializer.TRUSTED_PACKAGES, ""); 如果我不使用通用工厂并将其与一个特定对象一起使用,它对于该对象工作正常,但那样我将不得不为四个不同的 Kafkalisteners 创建四个工厂。
我的卡夫卡列表:
@KafkaListener( topics="number_1_event", groupId="abc-group", containerFactory="kafkaABCListenerContainerFactory")
public void consumeMyMessage(MyTopic1Class data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) int offsetId) {
// code here
}
@KafkaListener( topics="number_2_event", groupId="abc2-group", containerFactory="kafkaABCListenerContainerFactory")
/* Similar method signature here*/
@KafkaListener( topics="number_3_event", groupId="abc3-group", containerFactory="kafkaABCListenerContainerFactory")
@KafkaListener( topics="number_4_event", groupId="abc4-group", containerFactory="kafkaABCListenerContainerFactory")
配置:
@Bean
public ConsumerFactory<String, Object> abcdConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(Object.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaABCListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(abcdConsumerFactory());
return factory;
}
我目前正在使用上述配置,并且如上所述出现信任包错误。如果我用 MyTopic1Class 或 MyTopic2Class 替换上述配置中的 Object,然后运行它,它将适用于该特定对象。请帮忙 !!
解决方案
你的意思是我可以使用
ConcurrentKafkaListenerContainerFactory<String, MyTopic1Class>
andConsumerFactory<String, MyTopic1Class> abcdConsumerFactory()
。当我的 number_1_event 主题收到 MyTopic1Class 时,此工厂签名将起作用。当主题 number_2_event 收到 MyTopic2Class 并使用相同的工厂签名时它会起作用吗
是的,它会工作,但使用起来更清洁
ConcurrentKafkaListenerContainerFactory<String, Object> abcdContainerFactory()
ConsumerFactory<String, Object> abcdConsumerFactory()
您的听众可以消费更窄的类型MyTopic1Class
等MyTopic2Class
。
工厂的泛型类型只是编译时的;它们在运行时被擦除。
推荐阅读
- java - 向 http 服务器发送了许多异步请求,但一次只处理几个请求
- r - 将 .csv 中的列作为“字符”类型而不是 R 中的整数导入
- php - Ubuntu 18.04编译php7.4
- google-sheets - 我可以设置 Google 表格以复制另一张表格中的所有内容,但具有特定字数的单元格除外?
- python - Simplifying != if statements
- java - MongoDB 3.6:当该字段被加密时如何按文本索引字段进行搜索?
- javascript - onlick 函数是否等待承诺?
- sql-server - 需要帮助以在 SQL Server 上启用变更数据捕获 (CDC)
- python - 如何在beautifulsoup中找到列表的父母
- javascript - 为什么子组件的 prop 只显示父组件的初始值?