首页 > 解决方案 > 将消息加入 Kafka Stream 后从流中清除消息

问题描述

我正在使用 Kafka Streams 通过键加入来自两个不同 Kafka 主题的两种不同类型的消息。我正在使用滑动时间窗口。此窗口策略保留来自流的信息,其类型数量与消息是否加入某些内容无关。

在输入流的吞吐量非常高的情况下,Kafka 为执行连接而创建的主题会快速增长,从而消耗大量磁盘空间。

加入后是否有可能从上述主题中清除消息?这样,我将假设一条消息最多与具有相同密钥的另一条消息连接一次。

非常感谢。

标签: apache-kafkaapache-kafka-streams

解决方案


更新

从 2.4.0 版本开始,您可以通过参数配置流-流连接StreamJoined(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+到+加入)。

您可以创建一个WindowedStoreSupplier通过Stores工厂类,并在传递给该方法的StreamJoined对象上指定供应商。join()

原始答案

until()您可以通过参数减少保留时间:

stream1.join(stream2, JoinWindows.of(...).until(/*put retention time here*/);

指定的保留时间将用于本地存储以及基础更改日志主题。请注意,如果更改日志主题已经存在,则更改until()不会更新主题配置——您需要手动更新主题配置。


推荐阅读