首页 > 解决方案 > @kafkaListener 的范围

问题描述

我只是想了解@kafkaListener 的范围是什么,无论是原型还是单例。如果单个主题有多个消费者,它是返回单个实例还是多个实例。就我而言,我有多个客户订阅了单个主题并获取报告。我只是想知道如果会发生什么

标签: javaapache-kafkaspring-kafka

解决方案


为所有消费线程调用单个侦听器实例。

注释 @KafkaListener 不是原型范围的,并且此注释也不可能。

4.1.10. Thread Safety

When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:

    Use n containers with concurrency=1 with a prototype scoped MessageListener bean so that each container gets its own instance (this is not possible when using @KafkaListener).

    Keep the state in ThreadLocal<?> instances.

    Have the singleton listener delegate to a bean that is declared in SimpleThreadScope (or a similar scope).

To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent when each thread exits. You can consume these events with an ApplicationListener or @EventListener method to remove ThreadLocal<?> instances or remove() thread-scoped beans from the scope. Note that SimpleThreadScope does not destroy beans that have a destruction interface (such as DisposableBean), so you should destroy() the instance yourself.
    By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective. 

在此处输入图像描述

https://docs.spring.io/spring-kafka/reference/html/

=== 已编辑 ===

让我们采取他们的第三个选项(Delcaring a SimpleThreadScope 并委托给它)

注册 SimpleThreadScope 。它不会自动拾取。您需要像下面这样注册它:

@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
    return new BeanFactoryPostProcessor() {
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

            beanFactory.registerScope("thread", new SimpleThreadScope());
        }
    };
}

使用 scopeName = "thread" 创建一个组件

    @Component
    @Scope(scopeName = "thread", proxyMode = ScopedProxyMode.TARGET_CLASS)
    public class KafkaDelegate{


     public void handleMessageFromKafkaListener(String message){
  
             //Do some stuff here with Message
    }
}

创建一个@Service

public class KafkaListenerService{


    @Autowired
    private KafkaDelegate kafkaDelegate;

    
    @KafkaListener(id = "id1", topics = "testTopic" )
    public void listen(String message) {
        kafkaDelete.handleMessageFromKafkaListener(message);
    }

}

另一个例子:如何使用 Spring Kafka 实现有状态消息监听器?


推荐阅读