首页 > 解决方案 > 如何使用 Spring-Kafka 实现消费者线程安全

问题描述

我正在使用 spring boot 2.1.7.RELEASE 和 spring-kafka 2.2.7.RELEASE。我正在使用 @KafkaListener 注释来创建消费者,并且我正在使用消费者的所有默认设置。

这是我的消费者配置:

@Configuration
@EnableKafka
public class KafkaConsumerCommonConfig implements KafkaListenerConfigurer {

      @Bean
      public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(primaryConsumerFactory());
        factory.getContainerProperties().setMissingTopicsFatal(false);
        return factory;
      }

      @Bean
      public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(sapphireKafkaConsumerConfig.getConfigs());
      }

}

由于某些原因,我在同一个应用程序中有多个消费者,如下所示。

@KafkaListener(topics = "TEST_TOPIC1")
public void consumer1(){

}

@KafkaListener(topics = "TEST_TOPIC2")
public void consumer2(){

}

@KafkaListener(topics = "TEST_TOPIC3")
public void consumer3(){

}

话虽如此,根据关于“消费者线程安全”的融合文件

一个线程中不能有多个属于同一组的消费者,也不能让多个线程安全地使用同一个消费者。每个线程一个消费者是规则。要在一个应用程序中运行同一组中的多个消费者,您需要在各自的线程中运行每个消费者。将消费者逻辑包装在自己的对象中,然后使用 Java 的 ExecutorService 启动多个线程,每个线程都有自己的消费者,这很有用。

现在我的问题是,当我使用上面的代码使用 spring-kafka 来解决这种情况时,我是否应该做任何额外的事情,或者没关系,因为组会随机生成,因为我没有指定?请建议。

标签: spring-kafka

解决方案


您的所有侦听器都针对不同的主题进行了配置。所以,他们的组完全无关紧要。当同一主题有多个消费者时,该小组就会出现。


推荐阅读