首页 > 解决方案 > Kafka:在 x 时间内没有更新时更新密钥

问题描述

在使用 Kafka 时,有没有办法在 x 时间内没有看到密钥后更新密钥?

就像是

records
    .groupByKey
    .windowedBy(
         TimeWindows
         .of(Duration.ofMinutes(5))
         .grace(Duration.ofMinutes(1))
         .advanceBy(Duration.ofMinutes(1))
    ).count()
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
    ).updateNotSeen(Duration.ofMinutes(30), (k) => (k, 0))

所以在这里,只要 30 分钟后没有看到记录,Kafka 就会发出一条新记录。(由假设的 updateNotSeen 完成。)

在我的搜索中,我发现了这个未解决的问题,如果它存在的话,它允许我以某种方式做到这一点,但我不知道我现在会怎么做。

标签: scalaapache-kafkaapache-kafka-streams

解决方案


据我所知,这在 DSL(Java、Scala)中是不可能的。

但是,在提供开箱即用的此类功能之前,您可以使用Kafka Streams的处理器 API自己实现此类自定义功能。(例如,处理器 API 可以类似地用于实现自定义连接操作。)。在这种情况下,您不会使用表——它是一个仅限 DSL 的抽象——而是使用状态存储Processor(表由状态存储支持,fwiw),它支持来自附加的 s 或s 的直接读写访问Transformer。处理器和转换器支持标点符号来安排定期操作,类似于cron. 在此类计划操作期间,您可以检查是否有任何记录(由其记录键标识)在过去 30 分钟内未看到更新,然后采取相应措施。

此外,知道您可以将处理器 API 和 DSL(您迄今为止一直在使用)结合起来也很有帮助。也就是说,您可以在大部分代码中继续使用 DSL,并且仅在需要的时间和地点“插入”上述处理器/转换器(来自处理器 API)。

希望这可以帮助!


推荐阅读