首页 > 解决方案 > 在 Spring Cloud Kafka Streams 中序列化/反序列化泛型类型

问题描述

主要目的是从一个主题中读取一个流,应用一些转换,然后将两个事件发送到其他主题。为此,我们使用 Kstream.branch() 函数并使用函数式编程。代码是:

输入 POJO:

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class FooInput {

    @JsonProperty("field1")
    private String field1;

    @JsonProperty("field2")
    private String field2;
}

输出 POJO:

@Getter
@Setter
@ToString
@EqualsAndHashCode
public class FooEvent<T> extends EventInfo {

    @JsonProperty(value = "entity")
    private T entity;

    @Builder
    private FooEvent(T entity, String eventId, OffsetDateTime eventTime, Action eventAction, String eventSourceSystem, String eventEntityName) {
        super(eventId, eventTime, eventAction, eventSourceSystem, eventEntityName);
        this.entity = entity;
    }

    public FooEvent() {
        super();
    }

}
@Setter
@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public abstract class EventInfo {

    @JsonProperty(value = "eventId")
    private String eventId;

    @JsonProperty(value = "eventTime")
    private OffsetDateTime eventTime;

    @JsonProperty(value = "eventAction")
    private Action eventAction;

    @JsonProperty(value = "eventSourceSystem")
    private String eventSourceSystem;

    @JsonProperty(value = "eventEntityName")
    private String eventEntityName;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class Bar {

    @JsonProperty("field1")
    private String field1;

    @JsonProperty("field2")
    private String field2;

    @JsonProperty("field3")
    private String field3;
}

处理器功能:

    @Bean
    public Function<KStream<String, FooInput>, KStream<String, FooEvent<Bar>>[]> process() {

        Predicate<String, FooEvent<Bar>> predicate1=
            (key, value) -> value.getEntity().getField1().equalsIgnoreCase("test1");
        Predicate<String, FooEvent<Bar>> predicate2=
            (key, value) -> value.getEntity().getField1().equalsIgnoreCase("test2");

        return input -> {
            input
                ...
                .branch(predicate1, predicate2);
        };
   }

绑定在 appplication.properties 中声明:

输入:

spring.cloud.stream.bindings.process-in-0.destination=topic0
spring.cloud.stream.bindings.process-in-0.content-type=application/json

输出:

spring.cloud.stream.bindings.process-out-0.destination=topic1
spring.cloud.stream.bindings.process-out-0.content-type=application/json

spring.cloud.stream.bindings.process-out-1.destination=topic2
spring.cloud.stream.bindings.process-out-1.content-type=application/json

问题在于应用程序何时评估谓词。它似乎试图转换为FooEvent<Bar>. 它可以很好地转换eventId, eventTime, eventAction, ... 字段,但是当涉及到entity字段(在这种情况下Bar)时,它将值存储在 HashMap 上(而不是创建新Bar对象并设置正确的字段),这让我相信Spring 默认 Serde (JsonSerde) 做错了什么。有关如何解决 Kafka Streams 中的泛型类型 Serde 问题的任何建议?

标签: apache-kafkajacksonspring-cloudapache-kafka-streamsspring-cloud-stream-binder-kafka

解决方案


推荐阅读