首页 > 解决方案 > 如何从 KTable 中删除旧密钥

问题描述

我有一个生产者,它将密钥为 A 或 B 的记录发布到 Kafka 主题。

在流应用程序中,我将键 A 的每个记录平面映射到键 U、V 或 W 的记录,并将键 B 的每个记录映射到键 X、Y 或 Z 的记录。

每个平面地图操作创建的记录数各不相同。例如,具有键 A 的特定记录可能映射到具有键 U 的一个记录,但另一个可能映射到键 V 和键 W 之一。

我想为这些平面映射记录创建一个 KTable。但是,我希望此 KTable 的键与键 A 和 B 的每条记录的最后一个平面映射操作生成的键相匹配。

例如,如果在某个特定时刻,KTable 具有 X、Y 和 U 键,然后发布了具有键 B 的记录并仅映射到具有键 X 的一条记录,我希望 KTable 仅具有 X 的键和你。

如果有人对我如何做到这一点有任何建议,我将不胜感激。

标签: javaapache-kafkaapache-kafka-streamsspring-kafka

解决方案


flatMap()将需要有状态,即,您可以使用flatTransfrom()状态存储来实现它。我们可以通过运算符将结果flatTransform()插入到结果中(在 2.5 版本中添加)。KTableKStream#toTable()

键值存储维护来自输入主题的原始数据。每次键更改时,您现在都可以访问键的旧记录(来自存储)和新记录(输入到tranform()),并且可以发出相应的记录以更新下游KTable


推荐阅读