java - 为 Kafka Consumer 编写 JUnit 测试
问题描述
我有一个订阅某个主题的 kafka 消费者。实施工作正常。但是当试图为此实现单元测试时,由于它是通过Runnable
接口实现的,所以会出现问题。
执行
@Override
public void run() {
kafkaConsumer.subscribe(kafkaTopics);
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
Map<String, InventoryStock> skuMap = new LinkedHashMap<>();
try {
// populating sku map with consumer record
for (ConsumerRecord<String, String> record : records) {
populateMap(skuMap, record.value());
}
if (MapUtils.isNotEmpty(skuMap)) {
// writing sku inventory with populated sku map
inventoryDao.updateInventoryTable(INVENTORY_JOB_ID, skuMap);
}
} catch (Exception e) {
}
kafkaConsumer.commitAsync();
}
}
我尝试使用MockConsumer
. 但在实现时需要分配给消费者。但是实施中的消费者并没有暴露在外面。这是我尝试过的。
@Before
public void onBefore() {
MockitoAnnotations.initMocks(this);
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
skuInventoryConsumer = new SkuInventoryConsumer(consumerProps);
KafkaConsumer kafkaConsumerMock = mock(KafkaConsumer.class);
Whitebox.setInternalState(skuInventoryConsumer, "LOGGER", LOGGER);
Whitebox.setInternalState(skuInventoryConsumer, "kafkaConsumer", kafkaConsumerMock);
}
@Test
public void should_subscribe_on_topic() {
consumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("my_topic", 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 0L, "mykey", "myvalue0"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 1L, "mykey", "myvalue1"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 2L, "mykey", "myvalue2"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 3L, "mykey", "myvalue3"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 4L, "mykey", "myvalue4"));
}
因为它是一个runnable
并且消费者没有暴露,所以这个测试没有按预期工作。我该如何解决这个问题?
解决方案
我建议使用 Mockito,如下面的示例
Consumer<String, String> kafkaConsumerLocal = mock(Consumer.class);
KafkaConsumer kafkaConsumer = spy(new KafkaConsumer("topic-name"));
ReflectionTestUtils.setField(kafkaConsumer, "threadPoolCount", 1);
ReflectionTestUtils.setField(kafkaConsumer, "consumer", kafkaConsumerLocal);
doNothing().when(kafkaConsumer).runConsumer();
doNothing().when(kafkaConsumer).addShutDownHook();
doReturn(kafkaConsumerLocal).when(consumerInit).getKafkaConsumer();
推荐阅读
- python - Plotly:如何在两条垂直线之间设置填充颜色?
- android - 较新的 Lottie 文件不适用于 Android 上的 React Native?如何导出为旧版?
- perl - 记忆一个 XML 解析函数会破坏它
- sql - 尝试将值插入表中,其中一些值将是选择语句,而其他值将是硬编码的
- matlab - 在 MATLAB 中将 3D 矩阵重塑为 4D 矩阵
- javascript - 在函数调用上反应无限循环
- javascript - ReactRouterDom, AuthRoute 返回反应渲染函数作为反应子警告无效
- php - 在 3 列中显示字母查询
- javascript - Leaflet - 使用 Leaflet-Geoman 插件选择多个 geoJSON 多边形特征
- openedge - Progress-4gl:事务范围如何应用于外部程序调用?