apache-kafka - Kafka Streams - 抑制直到窗口结束(不关闭)
问题描述
我正在对窗口流执行聚合并希望抑制早期聚合结果。早期结果是指在窗口结束之前计算的结果,而不是在宽限期内发生的结果。因此,我想抑制所有带有时间戳<窗口结束的聚合结果,但转发所有时间戳> =窗口结束和时间戳<窗口关闭的记录。
最小的 Kafka Streams 拓扑示例:
new StreamsBuilder()
.stream("my-topic")
.windowedBy(TimeWindows.of(myWindowSize).grace(myGracePeriod))
.reduce(myReducer)
.suppress( /* searched for*/ )
.toStream();
因此,Suppressed.untilWindowCloses( .. )
这对我来说不是一个选择,因为我必须等到宽限期到期,这可能很长。
根据KIP-328,可以使用 as 获得所需的行为Suppressed.untilTimeLimit(Duration.ZERO, .. )
(引自 KIP 的描述):
一个。发射前等待更多更新的时间。这是一个时间量,从事件时间(对于常规 KTables)或从窗口结束(对于窗口化 KTables)测量,在将每个键发送到下游之前缓冲每个键。
然而, Kafka Streams JavaDoc以及相应的实现意味着情况并非如此,并且时间限制在接收每个(窗口)键的第一条记录时开始倒计时,而不是在窗口结束时开始倒计时。
我很高兴对此进行澄清并支持如何实现所需的行为。
解决方案
KIP 描述不正确(我相应地更新了 wiki 页面)。请注意,KIP 的进一步下方说:
限速更新
假设我们希望将 KTable 的更新速率降低到每个键大约每 30 秒更新一次。我们不想为此使用太多内存,而且我们认为我们不会在任何时候更新超过 1000 个键。
table .suppress(untilTimeLimit(Duration.ofSeconds(30), maxRecords(1000))) .toStream(); // etc.
因此,使用untilTimeLimit
用于定期发射。对于窗口聚合,间隔计时器将在窗口开始时间开始 - 您仍然可以将等待期设置为“窗口大小”以不获得任何“早期”更新,但您不会在窗口结束后看到每个更新通过但只能看到“窗口大小间隔”中的更新。如果你的宽限期真的很长,这可能还不够好吗?
您描述的用例目前不受支持,但我认为这是一个非常有趣和有用的用例。也许您可以创建功能请求票?
推荐阅读
- arrays - 数组字符作为c中的参数
- docker - Logstash 在 Kubernetes 中保留消息几个小时
- flutter - 如何停止 ColorFiltered 混合?
- laravel-8 - Jetstream 用户配置文件更新:使用 Livewire 表单输入传递 $input[] 字段时出现问题
- javascript - 为什么这个 javascript while 循环会挂起?
- python - 将多个类分配给具有数字数据的列中的新变量
- android-studio - SharedPreference 没有保存最后添加的 stringSet
- html - Html / CSS:图像斜切屏幕的一半
- windows - 根据点距离调整图像大小
- python - 使用 .loc 从 pandas 请求数据