apache-kafka - 没有输入主题的拓扑将不会创建流线程和全局线程
问题描述
我正在编写一个 Kafka Streams 应用程序,并且我想在此应用程序中包含两个应用程序 ID,但我一直收到错误消息说“没有输入主题的拓扑将不会创建流线程和全局线程,必须订阅至少一个源主题或全局表。” 你能告诉我我在哪里做错了吗?太感谢了!
public class KafkaStreamsConfigurations {
...
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
@Primary
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
setDefaults(props);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
return new KafkaStreamsConfiguration(props);
}
public void setDefaults(Map<String, Object> props) {...}
@Bean("snowplowStreamBuilder")
public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
Map<String, Object> props = new HashMap<>();
setDefaults(props);
...
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
Properties properties = new Properties();
props.forEach(properties::put);
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
streamsBuilderFactoryBean.setStreamsConfiguration(properties);
return streamsBuilderFactoryBean;
}
}
这是我的应用程序类。
public class SnowplowStreamsApp {
@Bean("snowplowStreamsApp")
public KStream<String, String> [] startProcessing(
@Qualifier("snowplowStreamBuilder") StreamsBuilder builder) {
KStream<String, String>[] branches = builder.stream(inputTopicPubsubSnowplow, Consumed
.with(Serdes.String(), Serdes.String()))
.mapValues(snowplowEnrichedGoodDataFormatter::formatEnrichedData)
.branch(...);
return branches;
}
}
解决方案
命名您的工厂 beanDEFAULT_STREAMS_BUILDER_BEAN_NAME
而不是snowplowStreamBuilder
- 否则,默认工厂 bean 将在没有定义的流的情况下启动。
推荐阅读
- docker - Docker-compose 有效,只是 Docker 使用 Dockerfile - 没有。为什么?
- python - cmd displaying output only after program has finished
- laravel - Laravel 8 - GET http://localhost:8000/css/app.css net::ERR_ABORTED 404 (Not Found)
- csv - 将 bytes.Buffer 与 csv.writer 一起使用
- r - 使用 stringr (r) 提取多个条件
- java - BufferedReader 中的 readLine() 是如何工作的?
- dask - 分布式 dask 调度程序节点是否需要与工作节点相同的环境?
- python - 快速将 SQL 数据序列化为 python 列表
- c++ - C ++访问结构中结构数组的元素
- c++ - 创建一个新字符以传递给 C 中的方法