首页 > 解决方案 > Kafka Streams - 抑制直到窗口结束(不关闭)

问题描述

我正在对窗口流执行聚合并希望抑制早期聚合结果。早期结果是指在窗口结束之前计算的结果,而不是在宽限期内发生的结果。因此,我想抑制所有带有时间戳<窗口结束的聚合结果,但转发所有时间戳> =窗口结束和时间戳<窗口关闭的记录。

最小的 Kafka Streams 拓扑示例:

new StreamsBuilder()
        .stream("my-topic")
        .windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
        .reduce(myReducer)
        .suppress( /* searched for*/ )
        .toStream();

因此,Suppressed.untilWindowCloses( .. )这对我来说不是一个选择,因为我必须等到宽限期到期,这可能很长。

根据KIP-328,可以使用 as 获得所需的行为Suppressed.untilTimeLimit(Duration.ZERO, .. )(引自 KIP 的描述):

一个。发射前等待更多更新的时间。这是一个时间量,从事件时间(对于常规 KTables)或从窗口结束(对于窗口化 KTables)测量,在将每个键发送到下游之前缓冲每个键。

然而, Kafka Streams JavaDoc以及相应的实现意味着情况并非如此,并且时间限制在接收每个(窗口)键的第一条记录时开始倒计时,而不是在窗口结束时开始倒计时。

我很高兴对此进行澄清并支持如何实现所需的行为。

标签: apache-kafkaapache-kafka-streams

解决方案


KIP 描述不正确(我相应地更新了 wiki 页面)。请注意,KIP 的进一步下方说:

限速更新

假设我们希望将 KTable 的更新速率降低到每个键大约每 30 秒更新一次。我们不想为此使用太多内存,而且我们认为我们不会在任何时候更新超过 1000 个键。

table
  .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000)))
  .toStream(); // etc.

因此,使用untilTimeLimit用于定期发射。对于窗口聚合,间隔计时器将在窗口开始时间开始 - 您仍然可以将等待期设置为“窗口大小”以不获得任何“早期”更新,但您不会在窗口结束后看到每个更新通过但只能看到“窗口大小间隔”中的更新。如果你的宽限期真的很长,这可能还不够好吗?

您描述的用例目前不受支持,但我认为这是一个非常有趣和有用的用例。也许您可以创建功能请求票?


推荐阅读