首页 > 解决方案 > 没有输入主题的拓扑将不会创建流​​线程和全局线程

问题描述

我正在编写一个 Kafka Streams 应用程序,并且我想在此应用程序中包含两个应用程序 ID,但我一直收到错误消息说“没有输入主题的拓扑将不会创建流​​线程和全局线程,必须订阅至少一个源主题或全局表。” 你能告诉我我在哪里做错了吗?太感谢了!

public class KafkaStreamsConfigurations {
    ...
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    @Primary
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        setDefaults(props);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
        return new KafkaStreamsConfiguration(props);
    }

    public void setDefaults(Map<String, Object> props) {...}

    @Bean("snowplowStreamBuilder")
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        Map<String, Object> props = new HashMap<>();
        setDefaults(props);
        ...
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);

        Properties properties = new Properties();
        props.forEach(properties::put);
        StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
        streamsBuilderFactoryBean.setStreamsConfiguration(properties);
        return streamsBuilderFactoryBean;
    }
}

这是我的应用程序类。

public class SnowplowStreamsApp {
    @Bean("snowplowStreamsApp")
    public KStream<String, String> [] startProcessing(
        @Qualifier("snowplowStreamBuilder") StreamsBuilder builder) {
                KStream<String, String>[] branches = builder.stream(inputTopicPubsubSnowplow, Consumed
            .with(Serdes.String(), Serdes.String()))
            .mapValues(snowplowEnrichedGoodDataFormatter::formatEnrichedData)
            .branch(...);
        return branches;
    }
}

标签: apache-kafkaapache-kafka-streamsspring-kafka

解决方案


命名您的工厂 beanDEFAULT_STREAMS_BUILDER_BEAN_NAME而不是snowplowStreamBuilder- 否则,默认工厂 bean 将在没有定义的流的情况下启动。


推荐阅读