java - 使用kafka流在时间窗口中获取给定键的最后一个事件
问题描述
我开始使用 KStream 来使用来自现有主题的数据。
我只对在 10 秒窗口内获取给定 ID 的最后一个事件感兴趣。我尝试使用以下代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
但我最终得到了所有的事件,而不仅仅是最后一个。是否可以使用 KStream 做我想做的事?
解决方案
采用.suppress()
它抑制了窗口中的所有中间结果,只发出最终结果。
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(unbounded()))) // like this
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
你可以在这里阅读更多:https ://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
推荐阅读
- r - 如何在 R 中删除网格线并为 plotly 提供清晰的背景?
- php - 安装我的主题“listeo”时,我找到了这个错误解决方案
- postgresql - postgresql 仅从 JSONB 数据类型中选择一些子键
- c++ - 将 .bin 文件中的二进制数据读取到 C++ 中的结构中
- python - 在 select(query) sqlite3 python 中是否有任何符号或方法可以返回列中的所有数据?
- c - 在 C 中使用链表进行堆栈 - 显示功能不会停止
- excel - Excel Power Query:处理未显示的数据
- python - Python Webdriver-Manager 在 MXLinux 上显示未知问题
- google-app-engine - 如何永久重定向应用引擎服务?
- javascript - Javascript:将函数与字符串连接起来