首页 > 解决方案 > Kafka Streams 键加入,条件复杂

问题描述

我正在尝试按键加入KStreamGlobalKTable但有特定的逻辑。

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
    GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"

    stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
            (key, value) -> key,
            (valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});

例如,如果 key = "ABC",那么:

此外,还需要知道执行连接的条件 - 例如,按 3 个字母/按 2 个字母/按 1 个字母。

问题是,有可能还是我应该寻找解决方法?例如,使用相应的键(带有“ABC”键的表,一个带有“AB”键和一个带有“A”键的表)复制 GlobalKTable 并执行 3 个单独的连接?或者也许还有其他建议?

提前致谢!

标签: javaapache-kafkaapache-kafka-streamskafka-join

解决方案


可以对多个表使用一系列左连接(如果您知道经常想尝试连接)。如果连接成功,则跳过下一个连接。使用leftJoin()和的组合branch()应该允许您在每次加入后将流拆分为“加入”和“重试”。最后,您可以根据需要merge()将不同的结果流放在一起。


推荐阅读