java - 如何从 KTable 中删除旧密钥
问题描述
我有一个生产者,它将密钥为 A 或 B 的记录发布到 Kafka 主题。
在流应用程序中,我将键 A 的每个记录平面映射到键 U、V 或 W 的记录,并将键 B 的每个记录映射到键 X、Y 或 Z 的记录。
每个平面地图操作创建的记录数各不相同。例如,具有键 A 的特定记录可能映射到具有键 U 的一个记录,但另一个可能映射到键 V 和键 W 之一。
我想为这些平面映射记录创建一个 KTable。但是,我希望此 KTable 的键与键 A 和 B 的每条记录的最后一个平面映射操作生成的键相匹配。
例如,如果在某个特定时刻,KTable 具有 X、Y 和 U 键,然后发布了具有键 B 的记录并仅映射到具有键 X 的一条记录,我希望 KTable 仅具有 X 的键和你。
如果有人对我如何做到这一点有任何建议,我将不胜感激。
解决方案
您flatMap()
将需要有状态,即,您可以使用flatTransfrom()
状态存储来实现它。我们可以通过运算符将结果flatTransform()
插入到结果中(在 2.5 版本中添加)。KTable
KStream#toTable()
键值存储维护来自输入主题的原始数据。每次键更改时,您现在都可以访问键的旧记录(来自存储)和新记录(输入到tranform()
),并且可以发出相应的记录以更新下游KTable
。
推荐阅读
- python - Python循环优化涉及二维数组和重复的一维数组的元素乘积
- react-native - 在反应本机滚动视图中设置偏移限制
- android - protobuf lite和protobuf java中的重复类
- python - 从 Python 3 中的 try-catch 块内部退出脚本
- android - 如何在 android 应用程序的活动中显示特定于语言的语法突出显示的代码?
- javascript - 有条件地使用 setState 和 redux 状态的正确方法
- javascript - Capybara 测试更改脚本有效并向 HTML 字段添加属性
- python - 从几个列表列表创建数据框
- php - 如何修复以下 PHP 警告?
- vuejs2 - 如何制作无限嵌套循环