apache-kafka - 在 Spring Cloud Kafka Streams 中序列化/反序列化泛型类型
问题描述
主要目的是从一个主题中读取一个流,应用一些转换,然后将两个事件发送到其他主题。为此,我们使用 Kstream.branch() 函数并使用函数式编程。代码是:
输入 POJO:
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class FooInput {
@JsonProperty("field1")
private String field1;
@JsonProperty("field2")
private String field2;
}
输出 POJO:
@Getter
@Setter
@ToString
@EqualsAndHashCode
public class FooEvent<T> extends EventInfo {
@JsonProperty(value = "entity")
private T entity;
@Builder
private FooEvent(T entity, String eventId, OffsetDateTime eventTime, Action eventAction, String eventSourceSystem, String eventEntityName) {
super(eventId, eventTime, eventAction, eventSourceSystem, eventEntityName);
this.entity = entity;
}
public FooEvent() {
super();
}
}
@Setter
@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public abstract class EventInfo {
@JsonProperty(value = "eventId")
private String eventId;
@JsonProperty(value = "eventTime")
private OffsetDateTime eventTime;
@JsonProperty(value = "eventAction")
private Action eventAction;
@JsonProperty(value = "eventSourceSystem")
private String eventSourceSystem;
@JsonProperty(value = "eventEntityName")
private String eventEntityName;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class Bar {
@JsonProperty("field1")
private String field1;
@JsonProperty("field2")
private String field2;
@JsonProperty("field3")
private String field3;
}
处理器功能:
@Bean
public Function<KStream<String, FooInput>, KStream<String, FooEvent<Bar>>[]> process() {
Predicate<String, FooEvent<Bar>> predicate1=
(key, value) -> value.getEntity().getField1().equalsIgnoreCase("test1");
Predicate<String, FooEvent<Bar>> predicate2=
(key, value) -> value.getEntity().getField1().equalsIgnoreCase("test2");
return input -> {
input
...
.branch(predicate1, predicate2);
};
}
绑定在 appplication.properties 中声明:
输入:
spring.cloud.stream.bindings.process-in-0.destination=topic0
spring.cloud.stream.bindings.process-in-0.content-type=application/json
输出:
spring.cloud.stream.bindings.process-out-0.destination=topic1
spring.cloud.stream.bindings.process-out-0.content-type=application/json
spring.cloud.stream.bindings.process-out-1.destination=topic2
spring.cloud.stream.bindings.process-out-1.content-type=application/json
问题在于应用程序何时评估谓词。它似乎试图转换为FooEvent<Bar>
. 它可以很好地转换eventId
, eventTime
, eventAction
, ... 字段,但是当涉及到entity
字段(在这种情况下Bar
)时,它将值存储在 HashMap 上(而不是创建新Bar
对象并设置正确的字段),这让我相信Spring 默认 Serde (JsonSerde) 做错了什么。有关如何解决 Kafka Streams 中的泛型类型 Serde 问题的任何建议?
解决方案
推荐阅读
- reactjs - 如何使用这个类添加不同的事件 |REACT JS|
- typescript - Typescript: Nested Typings - 寻找更优雅的解决方案
- java - 为什么我在 Java 中收到“找不到类”错误?
- java - 在使用 xml 映射器序列化 java bean 时从肥皂 Web 服务响应中屏蔽敏感数据
- html - 使用 flexbox 自定义画廊布局
- android - 默认值未设置为 TextInputLayout Android XML
- cron - 从 AppleScript 制作 cronjob
- android - Android Sqlite 中的 Select 查询中的自定义对象
- activemq-artemis - Apache ActiveMQ Artemis 如何调查消息是否丢失?
- ssms - 有没有办法使用 saltStack 创建 SQL Server xp_cmdshell_proxy_account