apache-kafka - 将消息加入 Kafka Stream 后从流中清除消息
问题描述
我正在使用 Kafka Streams 通过键加入来自两个不同 Kafka 主题的两种不同类型的消息。我正在使用滑动时间窗口。此窗口策略保留来自流的信息,其类型数量与消息是否加入某些内容无关。
在输入流的吞吐量非常高的情况下,Kafka 为执行连接而创建的主题会快速增长,从而消耗大量磁盘空间。
加入后是否有可能从上述主题中清除消息?这样,我将假设一条消息最多与具有相同密钥的另一条消息连接一次。
非常感谢。
解决方案
更新
从 2.4.0 版本开始,您可以通过参数配置流-流连接StreamJoined
(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+到+加入)。
您可以创建一个WindowedStoreSupplier
通过Stores
工厂类,并在传递给该方法的StreamJoined
对象上指定供应商。join()
原始答案
until()
您可以通过参数减少保留时间:
stream1.join(stream2, JoinWindows.of(...).until(/*put retention time here*/);
指定的保留时间将用于本地存储以及基础更改日志主题。请注意,如果更改日志主题已经存在,则更改until()
不会更新主题配置——您需要手动更新主题配置。
推荐阅读
- python - 如何在不暂停 tkinter 中的整个窗口的情况下暂停特定功能?
- list - 为什么清除列表会清除第一个?
- php - 从字符串中的引号中提取时如何包含等号和引号?
- push-notification - 推送通知可行性
- spring - @RepositoryRestResource 将 findAll 覆盖为仅登录用户
- c# - 如何使用 Selenium 在两个 HTML5 画布元素之间拖动一个元素?
- material-ui - 测试材料 Ui (v5) 抽屉 - 无法读取 null 的属性“scrollTop”
- typescript - Typescript - 使用通用键扩展对象
- batch-file - 从查询“wmic”中读取答案
- amazon-web-services - Govcloud 中的 DNS