java - 使用 Spring Kafka 编写的单元测试 Kafka Consumer
问题描述
我创建了一个 Kakfa 消费者应用程序(使用 spring kafka),它似乎工作正常。现在,我正在尝试为其添加单元测试用例。我的消费者是批量确认消费者,它是在容器内启动的。消费者的详细信息可以在下面的堆栈溢出帖子中找到(这是我问的)
我可以通过做一些研究为我的消费者编写一个测试用例,但在这里我必须复制容器的创建和其他一些东西才能使其工作。我想知道,是否有任何其他方式让测试用例使用在应用程序启动期间启动的同一容器(可能通过指向测试上下文)和在启动期间启动的消费者直接使用嵌入式 kafka 实例。
我想出的解决方案如下,但不确定这是不是正确的方法。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Launcher.class)
public class BatchConsumerTest {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "topic1");
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaTemplate<String, String> template;
@MockBean
private RestService restService;
@Before
public void setup() {
Mockito.when(restService.invokeService("")).thenReturn("");
}
@Test
public void test() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container = null;
for (MessageListenerContainer con : registry.getAllListenerContainers()) {
container = (ConcurrentMessageListenerContainer<?, ?>) con;
container.stop();
}
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group1", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<String, String>(
consumerProps);
ContainerProperties containerProps = new ContainerProperties("ibo-grocerybag-subscription");
containerProps.setAckMode(AckMode.MANUAL);
ConcurrentMessageListenerContainer<String, String> messageListContainer = new ConcurrentMessageListenerContainer<String, String>(
factory, containerProps);
BatchAcknowledgingConsumerAwareMessageListener<String, String> listener = new BatchConsumer();
CountDownLatch latch = new CountDownLatch(1);
// messageListContainer.setupMessageListener(listener);
messageListContainer.setupMessageListener(new BatchAcknowledgingConsumerAwareMessageListener<String, String>() {
@Override
public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
System.out.println("*******Data : " + data.get(0).value());
listener.onMessage(data, acknowledgment, consumer);
latch.countDown();
}
});
messageListContainer.start();
ContainerTestUtils.waitForAssignment(messageListContainer,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
template.send("topic1", "Hello");
latch.await(10000, TimeUnit.MILLISECONDS);
assertThat(latch.getCount()).isEqualTo(0);
}
@After
public void destroy() {
embeddedKafka.getEmbeddedKafka().destroy();
}
解决方案
推荐阅读
- php - 在 wordpress 中仅显示自定义类别图像
- asp.net - 使用第二个数据库调用刷新/更新实体框架导航属性数据
- sql - 向上或向下舍入数字,基于 0.5 并最接近整数
- simpleitk - Simpleitk:我可以在一个步骤中为多个图像使用单个标签吗?
- date - 在 ssrs 多维数据集中创建日期参数,得到无效令牌错误
- python - 使用 pcap 模板时出现 Ipv6Packet 错误
- jsf - 单击菜单后从 p:tieredMenu 隐藏菜单项
- reactjs - SecurityError:操作不安全 - React Heroku
- android - 当我通过 `TextAppearance` 间接应用它时,为什么会忽略`android:maxLines`
- java - java.lang.BootstrapMethodError:从 Athena java 类调用站点初始化异常