scala - Kafka:在 x 时间内没有更新时更新密钥
问题描述
在使用 Kafka 时,有没有办法在 x 时间内没有看到密钥后更新密钥?
就像是
records
.groupByKey
.windowedBy(
TimeWindows
.of(Duration.ofMinutes(5))
.grace(Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1))
).count()
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
).updateNotSeen(Duration.ofMinutes(30), (k) => (k, 0))
所以在这里,只要 30 分钟后没有看到记录,Kafka 就会发出一条新记录。(由假设的 updateNotSeen 完成。)
在我的搜索中,我发现了这个未解决的问题,如果它存在的话,它允许我以某种方式做到这一点,但我不知道我现在会怎么做。
解决方案
据我所知,这在 DSL(Java、Scala)中是不可能的。
但是,在提供开箱即用的此类功能之前,您可以使用Kafka Streams的处理器 API自己实现此类自定义功能。(例如,处理器 API 可以类似地用于实现自定义连接操作。)。在这种情况下,您不会使用表——它是一个仅限 DSL 的抽象——而是使用状态存储Processor
(表由状态存储支持,fwiw),它支持来自附加的 s 或s 的直接读写访问Transformer
。处理器和转换器支持标点符号来安排定期操作,类似于cron
. 在此类计划操作期间,您可以检查是否有任何记录(由其记录键标识)在过去 30 分钟内未看到更新,然后采取相应措施。
此外,知道您可以将处理器 API 和 DSL(您迄今为止一直在使用)结合起来也很有帮助。也就是说,您可以在大部分代码中继续使用 DSL,并且仅在需要的时间和地点“插入”上述处理器/转换器(来自处理器 API)。
希望这可以帮助!
推荐阅读
- makefile - 如何欺骗 Make 在子文件夹中动态运行命令
- azure-devops - 从 repo 构建特定的解决方案文件
- swift - 如何修复显示屏上的按钮
- python - Google OR-TOOLS VRP 以前的 OR-TOOLS 分配问题的问题
- postman - 如何在 Postman 中使用 pm.expect 断言来显示差异
- c++ - 使用 VSCode MACOS 构建时 GLFW 链接器命令失败(退出代码 1)
- reactjs - 如何记忆自定义钩子以提高性能
- javascript - 使用多个功能交换案例
- reactjs - React/Jest/Enzyme:在没有 Mocking 的情况下测试 useLocation Hook
- python - 使用 bbox_inches = 'tight' 时的 Matplotlib 缓冲区问题