首页 > 解决方案 > 将 Kafka 有效负载转换为对象

问题描述

我有一个关于 kafka 中的反序列化的问题。

我在我的 kafka 应用程序中收到了这个字符串

Payload: {"post":{"postId":"5e22fac7f7356803e8784172","tags":["a","lovve","asldkjfsbajfdkjlnzx","z"]},"date":"2020-01-18T16:12:50.833423","user":{"userId":"5dfcfd77367c690edd91b2d9"},"reactionType":"unloved"}

我的卡夫卡中有这个配置

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>(
            kafkaProperties.buildConsumerProperties());

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "magpie-trending");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
}

并且我有一个具有相同有效负载代码的类,我如何随时在我的对象中转换它?

所以我也有这种方法

大家好,我有一个微服务接收到这个字符串并且正在工作,但是我需要将此字符串转换为一个特定的对象,并且当我使用 ObjectMapper 转换应用程序时返回此异常:

抛出异常;嵌套异常是 com.fasterxml.jackson.databind.exc.InvalidDefinitionException:无法构造实例 com.avenuecode.magpie.trending.component.kafka.message.PostMessage (没有创建者,如默认构造,存在):无法从对象值反序列化(没有基于委托或属性的创建者)

所以我的对象是:

public class ReactionMessage {
 private PostMessage post;
 private String date;
 private UserMessage user;
 private String reactionType;

 @JsonCreator
 public ReactionMessage(@JsonProperty("post") PostMessage post,
                       @JsonProperty("user") UserMessage user,
                       @JsonProperty("date") String date,
                       @JsonProperty("reactionType") String reactionType) {
    this.post = post;
    this.date = date;
    this.user = user;
    this.reactionType = reactionType;
 }

 @JsonCreator
 public ReactionMessage() {
 }
}

当我调用映射器时,使用此方法:

  @KafkaListener(topics = "reaction-topic", clientIdPrefix = "string", groupId = "magpie-trending")
  public void listenAsObject(ConsumerRecord<String, String> cr,
                           @Payload String payload) throws IOException {
    logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
            typeIdHeader(cr.headers()), payload, cr.toString());

    ObjectMapper mapper = new ObjectMapper();
    ReactionMessage message = mapper.readValue(payload, ReactionMessage.class);

}

   private static String typeIdHeader(Headers headers) {
    return StreamSupport.stream(headers.spliterator(), false)
            .filter(header -> header.key().equals("__TypeId__"))
            .findFirst().map(header -> new String(header.value())).orElse("N/A");
}

标签: javaspringspring-bootspring-kafka

解决方案


你需要让你的JsonDeserializer回报你期望的对象类型

例如,Kafka 内置的 JSON 反序列化器只返回JsonNode.

Spring JSON 反序列化器有额外的属性来传递类名

Kafka - 反序列化消费者中的对象


推荐阅读