首页 > 解决方案 > Spring Cloud Stream 分支流未按预期工作

问题描述

下面是分支代码,它只流向一个主题(第一个主题)。据我了解,它应该流向所有三个主题?

无论如何,我可以使用分支流式传输到三个主题?

@Bean
        public Function<KStream<String, Usesr>, KStream<String, User>[]> testprocess() {

            Predicate<String, User> stream1 = (k, v) -> v != null;
            Predicate<String, User> stream2 = (k, v) -> v != null;
            Predicate<String, User> stream3 = (k, v) -> v != null;

            return input -> input.map(
                    (key, user) -> new KeyValue<String, User>(user.getId(), user))
                    .branch(stream1, stream2, stream3);

处理器的配置

        testprocess-in-0:
          destination: input.users
        testprocess-out-0:
          destination: users.test.out.0
        testprocess-out-1:
          destination: users.test.out.1
        testprocess-out-2:
          destination: users.test.out.2

标签: spring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


通过查看您的谓词,似乎第一个谓词总是获胜,而其他谓词则没有机会。在 Kafka Streams 分支中,第一个评估为 true 的谓词成功,相应的主题接收数据。您需要更改谓词中的逻辑,以便正确映射正确的主题。是一个例子。


推荐阅读