apache-kafka - 如何跟踪主题中两个事件之间的变化?
问题描述
MySTORE_TOPIC
是一个压缩主题,包含我的实体的当前状态,基于从EVENTS_TOPIC
(由 Kafka Streams 应用程序)消耗的事件。这样做的主要目的STORE_TOPIC
是作为(全局)KTable 加载。
EVENTS_TOPIC | STORE_TOPIC
|
VALUE | KEY VALUE
{"entity":"A", "a":1, "b":2} | A {"a":1, "b":2}
{"entity":"B", "c":3} | B {"c":3}
{"entity":"A", "d":4} | A {"a":1, "b":2, "d":4}
{"entity":"A", "a":0, "e":5} | A {"a":0, "b":2, "d":4, "e":5}
当定义的属性子集发生更改时,新的消费者需要得到通知。
例如,如果我们决定跟踪属性“a”、“b”和“c”的变化,预期的输出将是:
A {"a":1, "b":2}
B {"c":3}
(Nothing)
A {"a":0, "b":2}
为此,我编写了一个新的 Kafka Streams 应用程序:
- 处理包含每个键所需属性的“本地存储”
STORE_TOPIC
作为流加载- 将传入消息与“本地商店”的内容进行比较(感谢 a
.transform()
) - 如果检测到更改,则将传入消息(仅我们需要跟踪的属性)写入新消息
OUT_TOPIC
并添加/替换“本地存储”中的条目
有没有更简单、更优雅的方式来实现这一目标?
您能否确认,如果我STORE_TOPIC
直接将其加载为 KTable : .table(STORE_TOPIC)
,我只能访问实体的当前状态,而无法访问实体的先前版本并对其进行比较?
解决方案
推荐阅读
- powerbi - 如何在与比较日期相同的列上计算 DAX 中最接近的较早日期
- node.js - 如何使用适用于 JavaScript 的 AWS 开发工具包动态创建内联 Lambda?
- rancher - 如何在 Rancher GUI 中屏蔽 DB_PASSWORD 字段?
- .net - 如何在 vb6 exe 和 .net dll 之间传递控制对象?
- mongodb - 具有多个字段的Spring数据mongoDB推送操作不起作用
- mysql - Mysql 使用内部连接数据多次更新数据
- javascript - 在 v-for 循环中记住上传时的图像位置
- ssh - sudo over ssh - 不接受密码时卡住
- vue.js - 如何在 VueJS 中使用 ThreeJS
- c# - 从 C# MVC 实体框架中的数据库 onclick 中删除条目