首页 > 解决方案 > 如何手动启动 Kafka 监听器?

问题描述

我有一些带有注释的方法,@KafkaListener但我只想手动启动其中的一些(取决于某些条件)。

@KafkaListener(id = "consumer1", topics = "topic-name", clientIdPrefix = "client-prefix", autoStartup = "false")
public void consumer1(String message) {
    // consume
}
@PostConstruct
private void startConsumers() {
    if (true) {
        kafkaListenerEndpointRegistry.getListenerContainer("consumer1").start();
    }
}

但此时kafkaListenerEndpointRegistry.getListenerContainers()是空列表并kafkaListenerEndpointRegistry.getListenerContainer("consumer1")返回null。因此,调用方法的那一刻可能@PostConstruct还为时过早,并且侦听器仍未注册。我尝试使用注释startConsumers()方法,@Scheduled(fixedDelay = 100)并且侦听器已经可用。但是@Scheduled对于我想在启动应用程序后调用一次的东西来说,使用并不是一个好的决定。

标签: javaspringspring-bootapache-kafkaspring-kafka

解决方案


你不能这样做@PostConstruct——在应用程序上下文生命周期中还为时过早。

在方法中实现SmartLifecyle将阶段设置为Integer.MAX_VALUE并启动容器start()

或者使用@EventListener并监听ApplicationStartedEvent(如果使用 Spring Boot)ContextRefreshedEvent或非 Boot Spring 应用程序。


推荐阅读