首页 > 解决方案 > 使用 Kafka Streams 修改嵌套的 JSON 字段

问题描述

是否可以在 Kafka Streams 的帮助下对嵌套 JSON 字段应用过滤器?如果是,如何处理这些字段?

例如,

{
 "before":{
    "id":1,
    "name":"abc"  
  },
 "after":{
    "id":1,
    "name":"xyz"
}

现在,如果在字段后修改了名称,我不想过滤它,但是名称以外的字段正在被修改,我想过滤该记录。谢谢你。

标签: jsonapache-kafkakafka-consumer-apiapache-kafka-streams

解决方案


配置的 Stream Serde 的反序列化器应该返回您的对象类型。然后,您可以像常规 Java 流一样进行过滤

stream.filter(yourMesssage -> compareCDCRecords(yourMessage.getBefore(), yourMessage.getAfter()))

推荐阅读