spring-kafka - java.util.LinkedHashMap 无法转换为类 GenericEvent - objectmapper typeReference
问题描述
使用 json 反序列化器(spring commons)从 kafka 主题中消耗消息。通用消息结构如下。通用事件:
{
"id": "10000",
"payload": {
"id": 100
"attribute1": "hi",
"attribute2": "hello"
},
"type": {
"id" : 1,
"name" : "A"
}
}
不同类型有不同的payload,payload的结构也会有所不同。所以我想根据类型处理有效载荷。
我各自的 POJO 如下,总共创建了 3 个不同的有效载荷和各自的有效载荷 pojo。
GenericEvent {
private int id;
private T payload:
private Type type;
}
现在我正在使用下面的代码进行转换
JsonNode jsonNode = objectMapper.readTree("messagefromKafka);
GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});
但是代码抛出 java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class GenericEvent 。
有人可以帮助解决这个问题吗?
编辑://我已经提供的通用对象
//Payload Object - applicable for different types - A, B, C, D
public class Payload {
private int id;
private String name;
private String address;
private String typeAAttribute1; //applicable for type A attribute
private String typeAAttribute2; //applicable for type A attribute
private String typeBAtribute1; //applicable for type B attribute
private String typeABAtribute2; //applicable for type A,B attibute
private String typeCtribute1; //applicable for type C attibute
private String typeABCAtribute1;//applicable for type A,B,C attibute
}
Kafka consumer config:
---------------------
import org.springframework.kafka.support.serializer.JsonDeserializer;
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> reprocessListenerContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapservers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroupid");
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "300000");
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setRecordFilterStrategy(
(consumerRecord) -> {
try {
JsonNode jsonNode = objectMapper.readTree(consumerRecord.value().toString());
GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});
log.info(
"Retrieved the record {} from the partition {} with offset {}",
consumerRecord.value(),
consumerRecord.partition(),
consumerRecord.offset());
//Process type A and B events
if (genericEvent.getType().equalIgnoreCase("A") || genericEvent.getType().equalIgnoreCase("B"))) {
return false;
}
return true;
} catch (Exception ex) {
log.error("Error occured:{}", ex.getStackTrace());
return true;
}
});
return factory;
}
//Listener
@KafkaListener(id = "MYREPROCESS-ID", topics = "reprocess-test",
containerFactory = "reprocessListenerContainerFactory",
autoStartup = "true")
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
JsonNode jsonNode = objectMapper.readTree("messagefromKafka);
GenericEvent genericEvent = objectMapper.convertValue(jsonNode, new TypeReference<GenericEvent>() {});
//I should identify the respective payload during runtime
Payload payload = genericEvent.getPayload();
if (genericEvent.getType().equalsIgnoreCase("A") {
processPayload(payload);
} else {
processPayload(payload);
}
}
解决方案
有什么奇怪的。由于您使用的是 Spring JsonDeserializer
,因此您必须告诉它要转换为什么;属性记录在这里https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-json-config)。
在这种情况下,你会得到ConsumerRecord<?, GenericEvent>
.
如果您想自己接收ConsumerRecord<String, String>
并进行转换,则应使用StringDeserializer
s 代替。
推荐阅读
- amazon-web-services - 在 Athena 中查询表时获取 HIVE_CURSOR_ERROR
- python - tkinter 滚动条在调整窗口大小之前处于非活动状态
- rust - 将 TaskManager 脚本从 gdscript/Godot 迁移到 bevy 和 rust
- python-3.x - 如何在 python 3 中通过查看断言正则表达式来处理可变长度数字序列
- python - Python获取Speech to text语音音频数据
- python - SQL忽略带有WHERE IN UNNEST子句的空数组
- react-native - IdentityServer4 从 Google 等外部提供商注销
- javascript - 如何计算数组和 console.log 中每个重复元素的数量?
- python - 从 Python 中的嵌套列表中获取和分配变量中的元素
- css - 样式化数据表“搜索”