spring-kafka - 如何使用 Spring-Kafka 实现消费者线程安全
问题描述
我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.7.RELEASE。我正在使用 @KafkaListener 注释来创建消费者,并且我正在使用消费者的所有默认设置。
这是我的消费者配置:
@Configuration
@EnableKafka
public class KafkaConsumerCommonConfig implements KafkaListenerConfigurer {
@Bean
public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(primaryConsumerFactory());
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(sapphireKafkaConsumerConfig.getConfigs());
}
}
由于某些原因,我在同一个应用程序中有多个消费者,如下所示。
@KafkaListener(topics = "TEST_TOPIC1")
public void consumer1(){
}
@KafkaListener(topics = "TEST_TOPIC2")
public void consumer2(){
}
@KafkaListener(topics = "TEST_TOPIC3")
public void consumer3(){
}
话虽如此,根据关于“消费者线程安全”的融合文件
一个线程中不能有多个属于同一组的消费者,也不能让多个线程安全地使用同一个消费者。每个线程一个消费者是规则。要在一个应用程序中运行同一组中的多个消费者,您需要在各自的线程中运行每个消费者。将消费者逻辑包装在自己的对象中,然后使用 Java 的 ExecutorService 启动多个线程,每个线程都有自己的消费者,这很有用。
现在我的问题是,当我使用上面的代码使用 spring-kafka 来解决这种情况时,我是否应该做任何额外的事情,或者没关系,因为组会随机生成,因为我没有指定?请建议。
解决方案
您的所有侦听器都针对不同的主题进行了配置。所以,他们的组完全无关紧要。当同一主题有多个消费者时,该小组就会出现。
推荐阅读
- php - jenkins 对象实例化错误中的自动化测试使用 Laravel phpunit
- android - 无法从共享首选项中获取数据
- python - ValueError:传递值的形状为 (39, 1),索引暗示 (39, 7)
- java - 无法在 Spring Boot 和 PostgreSQL 之间配置 SSL 连接
- c# - C# 中的 ApiController 未读取变量
- asp.net-core - ASP.NET Core 中的@Helpers
- c# - C#如何覆盖只有get方法的字段?
- angular - 当我使用@input 传递数据时,我的数据会发生什么
- php - 通过 url 在 laravel 中传递布尔参数
- twilio - 如何设置简单的 IVR