spring-boot - SprintBoot kafka 值序列化器
问题描述
我有一个带有 apache kafka(一个开源流处理软件)的 SpringBoot 项目我有这个监听器
@KafkaListener(topics = "test")
public String consume(Hostel hostel) throws IOException {
}
这个序列化器
public class HostelSerializer implements Deserializer<Hostel> {
private final ObjectMapper objectMapper;
public InputRequestMessageSerializer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void configure(Map<String, ?> map, boolean b) {
this.configure(map,b);
}
@SneakyThrows
@Override
public Hostel deserialize(String s, byte[] bytes) {
return objectMapper.readValue(bytes, Hostel.class);
}
@Override
public void close() {
this.close();
}
}
在属性中:
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
topics: test
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.kafka.config.HostelSerializer
但是当我收到一条消息时,我有这个错误
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String]
to [com.message.Hostel] for GenericMessage [.....}]
查看日志它没有从属性中获取值:
2020-04-09 16:41:30,840 INFO : gid: trace= span= [main] o.a.k.c.consumer.ConsumerConfig ConsumerConfig values:
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
解决方案
你混合反/序列化。由于您配置了消费者,因此您只需要使用正确的反序列化接口和实现:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: group_id
topics: test
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.kafka.config.HostelDeserializer
... 和 ...
public class HostelDeserializer implements Deserializer<Hostel> { .. }
推荐阅读
- apache-kafka - Kafka 分区中的时间戳序列
- python - 列表操作中的 Python 列表
- javascript - 如何更改 vscode 的活动文本编辑器?
- python - 如何将 mark_geoshape 缩放到 Altair 中的特定区域?
- javascript - 使用 javascript 打印动态代码,该代码根据参数添加了一个方法
- python - 如果使用 Python 从负值变为正值,则在列表中查找连续值的索引
- java - 获取每个数组中不存在的元素
- javascript - 基于其他总计 Vanilla JavaScript 计算总计
- java - 将 Mat 对象转换为 Float32[1,66,200,3] android studio
- javascript - 未使用 process.exit() 从节点连接到 sqlite db