首页 > 解决方案 > 使用 Spring Kafka 编写的单元测试 Kafka Consumer

问题描述

我创建了一个 Kakfa 消费者应用程序(使用 spring kafka),它似乎工作正常。现在,我正在尝试为其添加单元测试用例。我的消费者是批量确认消费者,它是在容器内启动的。消费者的详细信息可以在下面的堆栈溢出帖子中找到(这是我问的)

Spring-Kafka 并发属性

我可以通过做一些研究为我的消费者编写一个测试用例,但在这里我必须复制容器的创建和其他一些东西才能使其工作。我想知道,是否有任何其他方式让测试用例使用在应用程序启动期间启动的同一容器(可能通过指向测试上下文)和在启动期间启动的消费者直接使用嵌入式 kafka 实例。

我想出的解决方案如下,但不确定这是不是正确的方法。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Launcher.class)
public class BatchConsumerTest {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "topic1");
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private KafkaTemplate<String, String> template;

    @MockBean
    private RestService restService;

    @Before
    public void setup() {
        Mockito.when(restService.invokeService("")).thenReturn("");
    }

    @Test
    public void test() throws Exception {
        ConcurrentMessageListenerContainer<?, ?> container = null;
        for (MessageListenerContainer con : registry.getAllListenerContainers()) {
            container = (ConcurrentMessageListenerContainer<?, ?>) con;
            container.stop();
        }
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group1", "false",
                embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<String, String>(
                consumerProps);
        ContainerProperties containerProps = new ContainerProperties("ibo-grocerybag-subscription");
        containerProps.setAckMode(AckMode.MANUAL);
        ConcurrentMessageListenerContainer<String, String> messageListContainer = new ConcurrentMessageListenerContainer<String, String>(
                factory, containerProps);
        BatchAcknowledgingConsumerAwareMessageListener<String, String> listener = new BatchConsumer();
        CountDownLatch latch = new CountDownLatch(1);
        // messageListContainer.setupMessageListener(listener);
        messageListContainer.setupMessageListener(new BatchAcknowledgingConsumerAwareMessageListener<String, String>() {

            @Override
            public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer) {
                System.out.println("*******Data : " + data.get(0).value());
                listener.onMessage(data, acknowledgment, consumer);
                latch.countDown();
            }
        });
        messageListContainer.start();
        ContainerTestUtils.waitForAssignment(messageListContainer,
                embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        template.send("topic1", "Hello");
        latch.await(10000, TimeUnit.MILLISECONDS);
        assertThat(latch.getCount()).isEqualTo(0);
    }

    @After
    public void destroy() {
        embeddedKafka.getEmbeddedKafka().destroy();
    }

标签: javaapache-kafkaspring-kafka

解决方案


推荐阅读