首页 > 解决方案 > KafkStreams:在窗口期间丢弃消息

问题描述

需要在一个时间窗口内丢弃重复的消息。消息不断传入。波纹管是代码的一部分。

 kStream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
            .windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
             .reduce((k,m) -> m)
             .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
             .toStream()
             .foreach((k, v) -> doSomeProcess(k,v));

我在这里做错了什么。我没有看到对方法 doSomeProcess 的任何调用。消息进来了。

标签: apache-kafka-streams

解决方案


原来“此功能需要为 Windows 添加“宽限期”参数”来自https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+ KTables .... .windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ofSeconds(5)) ) ....

这解决了这个问题。


推荐阅读