apache-kafka-streams - KafkStreams:在窗口期间丢弃消息
问题描述
需要在一个时间窗口内丢弃重复的消息。消息不断传入。波纹管是代码的一部分。
kStream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.reduce((k,m) -> m)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach((k, v) -> doSomeProcess(k,v));
我在这里做错了什么。我没有看到对方法 doSomeProcess 的任何调用。消息进来了。
解决方案
原来“此功能需要为 Windows 添加“宽限期”参数”来自https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+ KTables .... .windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ofSeconds(5)) ) ....
这解决了这个问题。
推荐阅读
- javascript - 当用户让计算机进入睡眠状态时,将用户从网站中注销
- javascript - 查找子字符串的第一个和最后一个索引
- python - 在 Python 中调用模拟方法的类的测试实例
- python - pytest 在没有 pytest.ini 文件的情况下从 shell 调用时忽略 venv
- sql - SQL优化分享邮编和号码
- node.js - 在 Node.js 中,为什么管道到 child_process.stdout 到 process.stdout 会去除 ANSI 颜色?
- datatables - 获取计数最高的项目
- amazon-web-services - 如何在 Kubernetes 提供商计划中引用 AWS 提供商计划的输出?
- python - C++ 从向量效率问题中删除
- java - 在给定时间内更改类的属性