scala - 将 Apache Flink 中的关系数据作为 State 存储并通过属性进行查询
问题描述
我有一个包含表T1(id, name, age)和T2(id, subject)的数据库。Flink 使用debezium 之类的东西从数据库接收所有更新作为事件流。这些表相互关联,可以通过在id上连接 T1和T2来提取所需的数据。目前数据库的整个状态都存储在 Flink MapState 中,以id为 key。现在的问题是我需要根据名称从T1中选择行而不使用id。好像我需要一个关于T1(name)的索引为了让它更快。有什么办法可以自动索引它,而不必为每个表手动创建索引。这样做的推荐方法是什么?我知道表上的SQL 流,但我需要支持对表的更新。顺便说一句,我使用 Flink 和 Scala。任何指针/建议将不胜感激。
解决方案
我的理解是,您正在连接 T1 和 T2,并以键控状态存储来自这两个流的数据的一些表示(在 MapState 中),由 id 键控。听起来 T1 和 T2 会随着时间的推移而发展,您希望能够通过指定名称随时以交互方式查询联接。
一种想法是以您想要选择的名称进行广播,并使用 KeyedBroadcastProcessFunction 来处理它们。在它的 processBroadcastElement 方法中,您可以使用 ctx.applyToKeyedState 通过从 MapState 记录中提取数据(必须保存在此运算符中)来计算结果。我怀疑您会希望将名称用作这些 MapState 记录中的键,这样您就不必遍历每个映射中的所有条目来查找感兴趣的项目。
您将在https://training.data-artisans.com/exercises/ongoingRides.html中找到与此模式有些相似的示例。
推荐阅读
- javascript - NextJS/JavaScript - ReferenceError: 数组未定义 HTMLInputElement.onchange
- java - 使用 Gradle App Engine Deploy 将 Spring Boot 微服务部署到 Google App Engine 时出现 UnSupportClassVersionError
- flash - 在线重玩 2021 年的旧 Flash 游戏
- android - 将包含不可序列化对象的 HashMap 传递给另一个 Fragment
- ethereum - 类工厂智能合约的构造器不断还原
- ruby-on-rails - Turbolinks to Turbo 升级已破坏表单重定向
- typescript - 连接到 drivenet 的客户端应用程序未注册我的验证身份
- python - 如何将txt文件中的文本列表输入到python中的变量?
- c - 如何在不打印的情况下格式化 C (const char*) 字符串?
- python - 如何从重复的字符串中提取单词