java - Kafka Streams 键加入,条件复杂
问题描述
我正在尝试按键加入KStream
,GlobalKTable
但有特定的逻辑。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"
stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
(key, value) -> key,
(valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});
例如,如果 key = "ABC",那么:
- 首先,按完整键加入 - 即“ABC”=“ABC”
- 然后,如果未加入,则通过前两个符号加入(删除一个符号) - 即“AB”=“AB”
- 最后,尝试仅通过一个符号加入 - 即“A”=“A”
此外,还需要知道执行连接的条件 - 例如,按 3 个字母/按 2 个字母/按 1 个字母。
问题是,有可能还是我应该寻找解决方法?例如,使用相应的键(带有“ABC”键的表,一个带有“AB”键和一个带有“A”键的表)复制 GlobalKTable 并执行 3 个单独的连接?或者也许还有其他建议?
提前致谢!
解决方案
可以对多个表使用一系列左连接(如果您知道经常想尝试连接)。如果连接成功,则跳过下一个连接。使用leftJoin()
和的组合branch()
应该允许您在每次加入后将流拆分为“加入”和“重试”。最后,您可以根据需要merge()
将不同的结果流放在一起。
推荐阅读
- mysql - 将 Cloud Pak For Data 连接到 mysql 数据库时出现 401 错误
- dart - 限制或扩展 dart 中的枚举
- reactjs - Netlify 中的动态表单
- html - Google Apps 脚本 - 从 HTML 创建 PDF 时不显示图像
- java - 如何在 Java 中评估带有逻辑运算符的表达式?
- reactjs - 将函数的结果传递到链上,以便父级更改
- php - 如何以类型安全的方式在 PHP 中处理用户输入
- php - 从文本中的不同格式中提取纬度和经度
- javascript - 为什么我的异步函数总是返回一个未决的承诺?
- slack - Slack API:如何向频道中的每个成员发送预定的私人消息?