java - 如何手动启动 Kafka 监听器?
问题描述
我有一些带有注释的方法,@KafkaListener
但我只想手动启动其中的一些(取决于某些条件)。
@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
// consume
}
@PostConstruct
private void startConsumers() {
if (true) {
kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
}
}
但此时kafkaListenerEndpointRegistry.getListenerContainers()
是空列表并kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
返回null
。因此,调用方法的那一刻可能@PostConstruct
还为时过早,并且侦听器仍未注册。我尝试使用注释startConsumers()
方法,@Scheduled(fixedDelay = 100)
并且侦听器已经可用。但是@Scheduled
对于我想在启动应用程序后调用一次的东西来说,使用并不是一个好的决定。
解决方案
你不能这样做@PostConstruct
——在应用程序上下文生命周期中还为时过早。
在方法中实现SmartLifecyle
将阶段设置为Integer.MAX_VALUE
并启动容器start()
。
或者使用@EventListener
并监听ApplicationStartedEvent
(如果使用 Spring Boot)ContextRefreshedEvent
或非 Boot Spring 应用程序。
推荐阅读
- html - 拖放克隆对象背景
- javascript - 这里映射 API JS 聚类:需要相对值而不是聚类权重的数量
- sql-server - 最后一个空格后的子字符串
- python - Pandas - 删除重复项但根据列中的值更改 keep:first/last
- clojure - 带有基本身份验证的 Composure + Swagger
- python-3.x - 使用 Python 连接到 HBase 的推荐方法是什么?
- arrays - 如何使用 ColdFusion 从 CSV 文件制作关联数组?
- react-native - ssl pinning 与 react native 和 ios
- c# - 如何首先连接到ef core数据库中的两个不同的数据库
- typescript - 如何在 vue 组合 api 组件中使用 jest 进行单元测试?