首页 > 解决方案 > 单元测试spring-kafka消费者时如何在KafkaEmbedded中设置端口

问题描述

我在使用来自集群的消息的应用程序中使用spring-boot-starter-parentversion 1.5.0.RELEASEspring-kafkaversion1.0.0.RELEASEspring-kafka-testversion 。我为我的消费者使用了一个单元测试,但由于代理端口是随机选择的,所以它失败了。有没有办法可以在不更改版本的情况下设置此代理属性?或者我应该使用哪些版本以免破坏任何东西?1.0.0.RELEASEKakfa 0.9KafkaEmbedded

这是KafkaListenerand的代码KafkaConsumerTest

监听器.java

@Service
public class Listener {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);
    private CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(topics = "topic", group = "group", containerFactory = "kafkaListenerContainerFactory")
    public void consumeClicks(@Payload String msg, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, @Header(KafkaHeaders.OFFSET) Integer offset, Acknowledgment ack) throws Exception {
        logger.info(msg);
        latch.countDown();
        ack.acknowledge();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

KafkaConsumerTest.java编辑

@DirtiesContext
@SpringBootTest(classes = {SpringApplication.class})
@RunWith(SpringRunner.class)
public class KafkaConsumerTest {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
    private static String TEST_TOPIC = "topic";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

    public KafkaTemplate<String, String> template;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    private Listener listener;

    @Before
    public void init(){
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
        Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());
        senderProps.put("key.serializer", StringSerializer.class);
        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps);
        template = new KafkaTemplate<>(producerFactory);
        template.setDefaultTopic(TEST_TOPIC);
    }

    @Test
    public void testConsume() throws Exception {
        String record = "message";
        template.sendDefault(TEST_TOPIC, record);
        logger.debug("test-consume sent record {}", record);
        listener.getLatch().await(1000, TimeUnit.MILLISECONDS);
        Assert.assertEquals(listener.getLatch().getCount(), 0);
    }
}

标签: javaspring-kafkaspring-kafka-test

解决方案


请使用 spring-kafka 1.3.9 和 boot 1.5;不再支持早期版本。当前启动 1.5.x 版本是 1.5.21。

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEST_TOPIC);

static {
    embeddedKafka.setKafkaPorts(1234);
}

setKafkaPorts从 1.3 开始可用。

但是,您在测试中正确使用了分配的随机端口

Map<String, Object> senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString());

要让 kafka 侦听器连接到嵌入式代理,您可以使用。

    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString()); 

推荐阅读