首页 > 解决方案 > Spring Cloud 流反序列化不同的 pojo

问题描述

我正在使用 Kstream 来使用 kafka 消息并将其保存到我的数据库中。这些消息属于不同的 pojo,目前我正在使用对象映射器来创建对象,然后将它们保存在数据库中。我读过可以使用 Jsondeserialzerserde,但我不确定如何使用它映射到不同的 pojo。为每个 pojo 定制一个 serde 是没有意义的。请帮忙 。提前致谢。

这是我的代码:

    public Consumer<KStream<String, String>> process() {

        return input ->
                inpu.foreach((key, value) -> {
                    ObjectMapper mapper = new ObjectMapper();
                    try {

                        if(value.contains("Teacher"))
                        {
                            Teacher teacher= mapper.readValue(value,Teacher.class);
                            teacherRepository.save(teacher);
                        }
                        else if(value.contains("Student"))
                        {
                            Student student= mapper.readValue(value,Student.class);
                            studentRepository.save(student)
                        }
                        else  if(value.contains("Principal"))
                        {
                            Principal principal= mapper.readValue(value,Principal.class);
                            principalRepository.save(Principal);
                        }
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }

                });    }

}

标签: javaspringapache-kafkaapache-kafka-streamsspring-cloud-stream

解决方案


Spring Cloud Stream 中的 Kafka Streams binder 不直接提供任何机制来执行您所要求的操作。您的消费者的类型签名表明您将其作为 使用String,因此绑定器可以在您不明确提供任何 的情况下推断该信息Serde。但是,如果您想String使用 jackson 进一步将其转换为其他类型,您需要在业务逻辑中自行完成,因为您已经拥有它。如果您只有几个有限类型,我认为这样做没有问题。


推荐阅读