首页 > 解决方案 > java.util.LinkedHashMap 无法转换为类 GenericEvent - objectmapper typeReference

问题描述

使用 json 反序列化器(spring commons)从 kafka 主题中消耗消息。通用消息结构如下。通用事件:

{
  "id": "10000",
  "payload": {
     "id": 100
     "attribute1": "hi",
     "attribute2": "hello"
  },
  "type": {
    "id" : 1,
    "name" : "A"
  }
}

不同类型有不同的payload,payload的结构也会有所不同。所以我想根据类型处理有效载荷。

我各自的 POJO 如下,总共创建了 3 个不同的有效载荷和各自的有效载荷 pojo。

GenericEvent {
    
    private int id;
    private T payload:
    private Type type;

}

现在我正在使用下面的代码进行转换

JsonNode jsonNode = objectMapper.readTree("messagefromKafka);
GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});

但是代码抛出 java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class GenericEvent 。

有人可以帮助解决这个问题吗?

编辑://我已经提供的通用对象

 //Payload Object - applicable for different types - A, B, C, D
     public class Payload {
  
           private int id;
           private String name;
           private String address;
           private String typeAAttribute1; //applicable for type A attribute
           private String typeAAttribute2; //applicable for type A attribute
           private String typeBAtribute1;  //applicable for type B attribute
           private String typeABAtribute2; //applicable for type A,B attibute
           private String typeCtribute1;  //applicable for type C  attibute
           private String typeABCAtribute1;//applicable for type A,B,C attibute
            
     }
    

    Kafka consumer config: 
    ---------------------
   

     import org.springframework.kafka.support.serializer.JsonDeserializer;
    
        @Bean
        public ConcurrentKafkaListenerContainerFactory<Object, Object> reprocessListenerContainerFactory() {
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroupid");
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "300000");
    
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    
            factory.setRecordFilterStrategy(
                    (consumerRecord) -> {
                        try {
                            JsonNode jsonNode = objectMapper.readTree(consumerRecord.value().toString());
                            GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});
                            log.info(
                                    "Retrieved the record {} from the partition {} with offset {}",
                                    consumerRecord.value(),
                                    consumerRecord.partition(),
                                    consumerRecord.offset());
                            //Process type A and B events
                            if (genericEvent.getType().equalIgnoreCase("A") || genericEvent.getType().equalIgnoreCase("B"))) {
                                return false;
                            }
                            return true;
                        } catch (Exception ex) {
                            log.error("Error occured:{}", ex.getStackTrace());
                            return true;
                        }
                    });
            return factory;
        }
    //Listener
    @KafkaListener(id = "MYREPROCESS-ID", topics = "reprocess-test",
            containerFactory = "reprocessListenerContainerFactory",
            autoStartup = "true")
    public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
        JsonNode jsonNode = objectMapper.readTree("messagefromKafka);
        GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});
                //I should identify the respective payload during runtime
 Payload payload = genericEvent.getPayload();
                if (genericEvent.getType().equalsIgnoreCase("A") {
                   processPayload(payload);
                } else {
                   processPayload(payload);
                }
        }

标签: spring-kafkaobjectmapperjackson-databind

解决方案


有什么奇怪的。由于您使用的是 Spring JsonDeserializer,因此您必须告诉它要转换为什么;属性记录在这里https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config)。

在这种情况下,你会得到ConsumerRecord<?, GenericEvent>.

如果您想自己接收ConsumerRecord<String, String>并进行转换,则应使用StringDeserializers 代替。


推荐阅读