首页 > 解决方案 > 如何控制聚合状态存储和更改日志主题的保留

问题描述

我的用例如下:订单通过主题流入激活系统。我必须识别相同键记录的更改。我使用聚合函数将现有值与新值进行比较,并输出一个事件,指出已识别的更改类型,即 DueDate Change。

密钥是一个随机生成的数字,唯一密钥的数量几乎不受限制。如果订购系统将修订推送到现有订单,则将重复使用相同的密钥。

该代码已经在生产环境中运行了几个月,但状态存储和更改日志主题正在增长,并且存在空间使用问题。我希望记录在国营商店 90 天后过期。我读到了在状态存储上应用基于时间的保留的方法,看起来聚合窗口化是实现这一目标的一种方式。

我了解窗口聚合仅适用于翻滚和跳跃窗口。滑动窗口仅可用于连接操作。

在这种情况下,翻转窗口不起作用,因为我将有 0-90、90-180 的窗口,并且我无法识别第 92 天的更新以获取第 89 天进入的记录(他们不会共享同一个窗口)。

现在唯一的其他选择是跳窗。

TimeWindows timeWindow = TimeWindows.of(90days).advanceBy(1day).until(1day);

问题是我必须坚持并更新 90 个窗口。流开始时,将创建 90 个窗口 0-90、1-91、2-92、3-93 等。如果我在窗口上保留 1 天,则窗口 0-90 将在当天清理91.

现在让我们说在第 90 天我得到了更新。如果我错了,请纠正我,但我的理解是我将不得不更新 90 个窗口,并且由于所有重复项,到那时我的状态存储将非常大。也许这就是我遗漏的地方。如果一条记录存在于 90 个窗口中,它是否在磁盘上物理写入了 90 次?

最后,我需要的只是防止我的状态存储和更改日志主题无限增长。90 天的历史数据足以支持我的用例。

有没有更好的方法来解决这个问题?

标签: apache-kafkaapache-kafka-streams

解决方案


不使用 DSL 而是使用带有窗口状态存储的处理器 API 可能更简单。窗口状态存储只是一个具有过期时间的键值对存储。因此,您可以像使用键值存储一样使用它——您只需提供一个额外的时间戳,最终将用于使数据过期。


推荐阅读