spring-boot - how to configure two instances of Kafka StreamsBuilderFactoryBean in spring boot
问题描述
Using spring-boot-2.1.3, spring-kafka-2.2.4, I want to have two streams configurations (e.g. to have different application.id, or connect to different cluster, etc). So I defined the first stream configuration pretty much according to the docs, then added a second one, with a different name, and a second StreamsBuilderFactoryBean (also with a different name):
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId1000");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//...
return new KafkaStreamsConfiguration(props);
}
@Bean(name = "myKappConfig")
public KafkaStreamsConfiguration myKafkaAppIdConfiguration() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId9999");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//...
return new KafkaStreamsConfiguration(props);
}
@Bean(name = "myKappConfigStreamBuilder")
public StreamsBuilderFactoryBean myAppStreamBuilder(
@Qualifier("myKappConfig") KafkaStreamsConfiguration myKafkaAppIdConfiguration) {
return new StreamsBuilderFactoryBean(myKafkaAppIdConfiguration);
}
However, when I try to run the app, I get:
Parameter 0 of method kafkaStreamsFactoryBeanConfigurer in org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration required a single bean, but 2 were found: - &defaultKafkaStreamsBuilder: defined by method 'defaultKafkaStreamsBuilder' in class path resource [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] - &myKappConfigStreamBuilder: defined by method 'myAppStreamBuilder' in class path resource [com/teramedica/kafakaex001web/KafkaConfig.class]
because the code in the spring-boot autoconfigure does:
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean) {
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}
Short of replacing the KafkaStreamsAnnotationDrivenConfiguration entirely, how do I define more than one StreamsBuilderFactoryBean. Or alternately, how can I change the properties for a given stream?
解决方案
用 标记一个工厂 bean @Primary
。
推荐阅读
- php - 为什么选择选项未在 PHP 上显示
- typescript - TypeScript 中文字类型的实际用例是什么?
- python - 如何在 Python 中编写变量的整个单词?
- regex - 如何编写 nginx 重写规则以在相对路径中捕获 /static
- python - 无噪声提取信号的深度学习模型
- pdf - 使用 Angular 或 Javascript 使用 USB TOKEN 进行 PDF 签名
- canvas - 将画布流与 getUserMedia 的音频流合并是听不见的(kurento/webrtc)
- rust - Rust 迭代器类似于 chunks()
- c# - 如何在 NSwag for .NET Framework 中向端点添加信息?
- firebase - Flutter 的 onTap 方法打开存储在 Firestore 中的经纬度