首页 > 解决方案 > 侦听器在使用 EmbeddedKafka Spring 进行测试时不使用消息

问题描述

我试图查看当消费者收到来自 kafka 主题的消息但测试没有通过并且消费者甚至没有收到消息时是否调用了服务。

我的测试:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ConsumerTest {

    @Autowired
    Consumer consumer;

    @Mock
    private Service service;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Test
    public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() 
      throws Exception {

        String message = "hi";

        kafkaTemplate.send("topic", message);

        consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);

       
        verify(service, times(1)).addMessage();
    }
}

主包中的消费者是具有@KafkaListener(topics = "topic") 的普通消费者。然后我有一个配置文件:

@EnableKafka
@Configuration
public class KafkaConfig {
  
  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
    return props;
  }

  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
        new StringDeserializer());
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
  }


}

而且在 application.properties (在测试包内)我把这个:

春天:kafka:消费者:自动偏移重置:最早的组ID:组

标签: springtestingapache-kafkaembedded-kafka

解决方案


推荐阅读