spring-boot - Kafka Streams以编程方式配置不起作用
问题描述
我正在做一个spring boot应用程序,我正在尝试以编程方式配置kafka,但由于某种原因仍然从application.yaml获取属性,而不是我以编程方式设置的属性
@Configuration
public class KafkaConfiguration {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(30000);
return factory;
}
public ConsumerFactory<String, String> kafkaConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "aaa"); // should crash since is not valid
props.put(ConsumerConfig.GROUP_ID_CONFIG, "app1");
return new DefaultKafkaConsumerFactory<>(props);
}
}
@Component
public class StreamListener {
@StreamListener(TestStreams.TEST_STREAM_IN)
public void testStream(@Payload GenericCustomEvent response, @Headers MessageHeaders headers) throws Exception {
log.debug("Received generic event {} with headers {}", response, headers);
}
}
public interface TestStreams {
String TEST_STREAM_IN = "test-stream-in";
@Input(TEST_STREAM_IN)
SubscribableChannel inputTestStream();
}
@EnableBinding({TestStreams.class})
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication .class, args);
}
}
解决方案
binder 不使用 a KafkaListenerContainerFactory
,它从 yaml 创建容器本身。
ListenerContainerCustomizer
您可以通过添加bean来修改容器。
推荐阅读
- html - 具有多个 dd 元素到 1 个 dt 元素的内联定义列表
- python - 如何从识别的轮廓中获取极端边缘?
- java - 异步插入到 cassandra,并保存每个键的插入顺序
- python - 重新采样 pandas 数据帧并返回开始时间和结束时间
- spring - org.thymeleaf.exceptions.TemplateProcessingException:无法解析为表达式
- python - 如何在 Tensorflow 中保存张量的值
- python - Plotly 3D 绘图注释
- c++ - 在哪里查看 OMP 时间表(自动)选择?
- javascript - 映射对象的一种简单方法 - 前一个键是一个值,前一个值是一个值 - 在新对象中
- python - NodeJS中Python进程输出的重复输出问题