java - @kafkaListener 的范围
问题描述
我只是想了解@kafkaListener 的范围是什么,无论是原型还是单例。如果单个主题有多个消费者,它是返回单个实例还是多个实例。就我而言,我有多个客户订阅了单个主题并获取报告。我只是想知道如果会发生什么
多个客户要同时查询报表。就我而言,我在成功消费消息后关闭容器,但同时如果其他人想要获取报告,容器应该是打开的。
如何将范围更改为与容器 Id 关联的原型(如果不是),以便每次都可以生成单独的实例。
@KafkaListener(id = "id1", topics = "testTopic" ) public void listen() { // code goes here }
解决方案
为所有消费线程调用单个侦听器实例。
注释 @KafkaListener 不是原型范围的,并且此注释也不可能。
4.1.10. Thread Safety
When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:
Use n containers with concurrency=1 with a prototype scoped MessageListener bean so that each container gets its own instance (this is not possible when using @KafkaListener).
Keep the state in ThreadLocal<?> instances.
Have the singleton listener delegate to a bean that is declared in SimpleThreadScope (or a similar scope).
To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent when each thread exits. You can consume these events with an ApplicationListener or @EventListener method to remove ThreadLocal<?> instances or remove() thread-scoped beans from the scope. Note that SimpleThreadScope does not destroy beans that have a destruction interface (such as DisposableBean), so you should destroy() the instance yourself.
By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective.
https://docs.spring.io/spring-kafka/reference/html/
=== 已编辑 ===
让我们采取他们的第三个选项(Delcaring a SimpleThreadScope 并委托给它)
注册 SimpleThreadScope 。它不会自动拾取。您需要像下面这样注册它:
@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
return new BeanFactoryPostProcessor() {
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
beanFactory.registerScope("thread", new SimpleThreadScope());
}
};
}
使用 scopeName = "thread" 创建一个组件
@Component
@Scope(scopeName = "thread", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class KafkaDelegate{
public void handleMessageFromKafkaListener(String message){
//Do some stuff here with Message
}
}
创建一个@Service
public class KafkaListenerService{
@Autowired
private KafkaDelegate kafkaDelegate;
@KafkaListener(id = "id1", topics = "testTopic" )
public void listen(String message) {
kafkaDelete.handleMessageFromKafkaListener(message);
}
}
推荐阅读
- java - 在我的应用程序中,null 不能转换为非 null 类型 java.util.HashMap
- python - 尝试提取 URL 时使用 Urllibopener 时引发 HTTP 错误
- c - 从起始余额/查找净值/总计循环
- css - CSS 不适用于 IE 11 和 saffari
- c# - 即使操作成功,面对对象引用也未设置异常
- spring-boot - Kotlin 数据类 No String-argument constructor with spring data rest
- visual-studio - 无法使用 .NET Standard 2.0 类库构建和调试我的 ASP.NET Core 2.1 应用程序
- python - 分组数据的平均值
- php - 包含包含的 PHP 文件路径
- python - PySpark中的Aroon指标:如何计算每组中最大值和当前值之间的行数