spring-boot - 使用 JUnit 5 和 EmbeddedKafkaBroker 在 Spring Boot 应用程序中测试 Apache Kafka 集成
问题描述
我有一个简单的生产者类定义如下:
@Configuration
public class MyKafkaProducer {
private final static Logger log = LoggerFactory.getLogger(MyKafkaProducer.class);
@Value("${my.kafka.producer.topic}")
private String topic;
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
public void sendDataToKafka(@RequestParam String data) {
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);
listenableFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Sent data {}", result.getProducerRecord().value());
}
@Override
public void onFailure(Throwable ex) {
log.error("Unable to send data {} due to: {}", data, ex.getMessage());
}
});
}
}
这是正在进行的测试类:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyKafkaProducerTest {
private static final String TOPIC = "device";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private MyKafkaProducer producer;
BlockingQueue<ConsumerRecord<String, String>> records;
KafkaMessageListenerContainer<String, String> container;
@BeforeAll
void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
public void testIfWorks() throws InterruptedException {
// Arrange
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{\"event\":\"Test Event\"}"));
producer.flush();
// Assert
ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("{\"event\":\"Test Event\"}");
}
问题是测试创建了一个默认生产者:
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
如何使用我自己的生产者MyKafkaProducer
,并调用它的sendDataToKafka
方法?在这种情况下,我们如何测试以及测试什么?
解决方案
所以这是一个 Spring Boot 应用程序,您正在使用自动配置的KafkaTemplate
.
要覆盖bootstrap-servers
使用嵌入式 kafka 代理,请参阅https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-testing-embeddedkafka-annotation
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
然后,您可以从测试用例中调用您的生产者。
推荐阅读
- c - 为什么我的 C http 请求会导致非法指令:4
- matplotlib - 多个子图右侧的唯一颜色条
- reactjs - react-testing-library - 无效的钩子调用
- android - Android Studio 3.6.1,布局设计视图未显示
- xpages - 如何使用 ODA(OpenNTF Domino API)制作 REST API?
- c# - 有没有可以用来设计自己的哈希函数库的程序?
- opencv - 如何从单个 PNG 中分离大小为 8 和 16 的两个位图
- html - css 动态 nth-child 选择器
- docker - docker compose up 无法挂载卷
- javascript - 如何处理 React Router 递归路径?