首页 > 解决方案 > 嵌入式 Kafka Spring 测试在嵌入式 Kafka 准备就绪之前执行

问题描述

我有一个 Spring Boot 项目,它有一个 Kafka 侦听器,我想使用 Embedded Kafka 进行测试。我让 Kafka Listener 注销消息“收到记录”。仅当我在方法的开头添加 a 时才会注销Thread.sleep(1000)

测试类:

@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {

    private static final String TOPIC = "my-topic";

    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Test
    void testSendEvent() throws ExecutionException, InterruptedException {
        // Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
        Producer<Integer, String> producer = configureProducer();
        ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
        producer.send(producerRecord).get();
        producer.close();
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}

我不想使用善变Thread.sleep()的测试显然是在一些设置过程完成之前执行的。我显然需要等待某些事情,但我不知道该怎么做。

使用:

标签: javaspring-bootapache-kafkaspring-kafkaembedded-kafka

解决方案


将 bean 添加到测试上下文并(例如)在收到a时@EventListener倒计时 a ;然后在测试中CountDownLatchConsumerStartedEvent

assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();

https://docs.spring.io/spring-kafka/docs/current/reference/html/#events

https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumption

或者添加一个ConsumerRebalanceListener并等待分区分配。


推荐阅读