spring - 侦听器在使用 EmbeddedKafka Spring 进行测试时不使用消息
问题描述
我试图查看当消费者收到来自 kafka 主题的消息但测试没有通过并且消费者甚至没有收到消息时是否调用了服务。
我的测试:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class ConsumerTest {
@Autowired
Consumer consumer;
@Mock
private Service service;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
String message = "hi";
kafkaTemplate.send("topic", message);
consumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
verify(service, times(1)).addMessage();
}
}
主包中的消费者是具有@KafkaListener(topics = "topic") 的普通消费者。然后我有一个配置文件:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
而且在 application.properties (在测试包内)我把这个:
春天:kafka:消费者:自动偏移重置:最早的组ID:组
解决方案
推荐阅读
- python - 使用 SARIMAX 在 python 上获得预测错误
- python - 在每个组内的值总和不超过 N 的条件下,将值列表排序到最少数量的组中的算法
- javascript - 从 Facebook 浏览器(在移动应用程序上)共享 sencha extjs 站点会出现白屏和错误
- javascript - 如何结合显示、不透明度和过渡?
- excel - 用值覆盖过滤表
- angular - RXJS - 等到商店不为空
- python - Flask & HTML,表单未提交
- python - 在 pymc3 中拟合非对称高斯参数
- node.js - 如何使用 blob 服务以编程方式删除 Azure 存储中未提交的块?
- python - Python 聊天室“ConnectionResetError: [WinError 10054] 现有连接被远程主机强行关闭”