首页 > 解决方案 > Kafka-Streams - 在加入之前过滤 GlobalKTable

问题描述

您能否就以下问题的处理方法给我一个建议。我有两个主题,一个是静态内容,第二个是数据流。任务是加入数据,这在正常情况下很容易。我会将静态内容读取为 GlobalKTable,将动态内容读取为 KStream,然后简单地加入它们。问题是查找数据存在于同一主题的多个版本中。“版本”由字段“validFrom”标识。因此流的数据需要根据时间戳与对应版本的查找数据相结合。有没有办法过滤 GlobalKTable 中的数据?

最好的问候马丁

标签: joinapache-kafkaapache-kafka-streams

解决方案


You cannot apply a filter operation on GlobalKTable itself but you could try to test the version of the records in the ValueJoiner and set the values of the join result records that do not pass the test to null. After the join, you can apply a filter that filters out all records that have a value that is null.


推荐阅读