首页 > 解决方案 > 使用 JUnit 5 和 EmbeddedKafkaBroker 在 Spring Boot 应用程序中测试 Apache Kafka 集成

问题描述

我有一个简单的生产者类定义如下:

@Configuration
public class MyKafkaProducer {

    private final static Logger log = LoggerFactory.getLogger(MyKafkaProducer.class);

    @Value("${my.kafka.producer.topic}")
    private String topic;

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    public void sendDataToKafka(@RequestParam String data) {

        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);

        listenableFuture.addCallback(new ListenableFutureCallback<>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Sent data {}", result.getProducerRecord().value());
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error("Unable to send data {} due to: {}", data, ex.getMessage());
            }
        });
    }
}

这是正在进行的测试类:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyKafkaProducerTest {

    private static final String TOPIC = "device";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Autowired
    private MyKafkaProducer producer;

    BlockingQueue<ConsumerRecord<String, String>> records;

    KafkaMessageListenerContainer<String, String> container;

    @BeforeAll
    void setUp() {
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TOPIC);
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) records::add);
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
    }

    @AfterAll
    void tearDown() {
        container.stop();
    }

    @Test
    public void testIfWorks() throws InterruptedException {
        // Arrange
        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

        // Act
        producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}"));
        producer.flush();

        // Assert
        ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
        assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
    }

问题是测试创建了一个默认生产者:

Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();

如何使用我自己的生产者MyKafkaProducer,并调用它的sendDataToKafka方法?在这种情况下,我们如何测试以及测试什么?

源代码可以在这里找到。正在进行工作测试的分支在这里。谢谢你。

标签: spring-bootspring-kafkaspring-test

解决方案


所以这是一个 Spring Boot 应用程序,您正在使用自动配置的KafkaTemplate.

要覆盖bootstrap-servers使用嵌入式 kafka 代理,请参阅https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-testing-embeddedkafka-annotation

@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")

然后,您可以从测试用例中调用您的生产者。


推荐阅读