java - 嵌入式 Kafka Spring 测试在嵌入式 Kafka 准备就绪之前执行
问题描述
我有一个 Spring Boot 项目,它有一个 Kafka 侦听器,我想使用 Embedded Kafka 进行测试。我让 Kafka Listener 注销消息“收到记录”。仅当我在方法的开头添加 a 时才会注销Thread.sleep(1000)
。
测试类:
@SpringBootTest
@DirtiesContext
@EnableKafka
@EmbeddedKafka(partitions = 1, topics = { "my-topic" }, ports = 7654)
class KafkaTest {
private static final String TOPIC = "my-topic";
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Test
void testSendEvent() throws ExecutionException, InterruptedException {
// Thread.sleep(1000); // I wont see the Listener log message unless I add this sleep
Producer<Integer, String> producer = configureProducer();
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(TOPIC, "myMessage");
producer.send(producerRecord).get();
producer.close();
}
private Producer<Integer, String> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(kafkaBroker));
return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
}
}
我不想使用善变Thread.sleep()
的测试显然是在一些设置过程完成之前执行的。我显然需要等待某些事情,但我不知道该怎么做。
使用:
- 爪哇 11
- 春季启动 2.5.6
- JUnit 5
- spring-kafka-test 2.7.8
解决方案
将 bean 添加到测试上下文并(例如)在收到a时@EventListener
倒计时 a ;然后在测试中CountDownLatch
ConsumerStartedEvent
assertThat(eventListner.getLatch().await(10, TimeUnit.SECONDS)).isTrue();
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#events
和
https://docs.spring.io/spring-kafka/docs/current/reference/html/#event-consumption
或者添加一个ConsumerRebalanceListener
并等待分区分配。
推荐阅读
- r - ggplot 中的图例名称 - 重命名
- javascript - 如何通过html将flask base_url发送到js文件
- c# - 在 .Net Core 中将 Angular 作为 Windows 服务启动时出错
- php - WordPress:如何从模板部分单击加载引导模式内容?
- asp.net-mvc - 控制器中的 ASP.NET MVC 选择查询
- mysql - 如何快速删除mysql中的重复记录
- c# - 你可以为类使用类似数组的构造函数吗
- google-analytics - Google Analytics:根据作为事件参数传递的字段对数据进行分类
- javascript - 将 javascript 板数组转换为 html tictactoe 板
- android - 不工作`ConstraintLayout`消失可见性