spring-boot - 如何使用 Spring Kafka 测试 Kafka Streams 应用程序?
问题描述
我正在使用 Kafka Streams、Spring-Kafka 和 Spring Boot 编写流应用程序。我找不到任何信息如何在使用 Spring-Kafka 时正确测试 Kafka Streams DSL 完成的流处理。文档提到 EmbeddedKafkaBroker,但似乎没有关于如何处理例如状态存储的测试的信息。
只是为了提供一些我想测试的简单示例。我注册了以下 bean(其中 Item 是 avro 生成的):
@Bean
public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
return streamsBuilder
.stream(ITEM_TOPIC,
Consumed.with(Serdes.String(), itemAvroSerde))
.mapValues((id, item) -> item.getNumber())
.groupByKey()
.aggregate(
() -> 0L,
(id, number, agg) -> agg + number,
Materialized.with(Serdes.String(), Serdes.Long()));
}
测试所有项目编号是否汇总的正确方法是什么?
解决方案
Spring Kafka for Kafka Streams 支持不会带来任何额外的 API,尤其是在流构建及其处理方面。
我们最近为自己打开了一个很好的kafka-streams-test-utils
库,可以在没有任何 Kafka 代理启动(甚至嵌入)的情况下用于单元测试。
在我们的几个测试中,我们有这样的东西:
KStream<String, String> stream = builder.stream(INPUT);
stream
.transform(() -> enricher)
.to(OUTPUT);
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);
ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(),
new StringSerializer());
driver.pipeInput(recordFactory.create(INPUT, "key", "value"));
ProducerRecord<byte[], byte[]> result = driver.readOutput(OUTPUT);
assertThat(result.headers().lastHeader("foo")).isNotNull();
我相信应该有一些 APITopologyTestDriver
来处理提到的状态存储。
推荐阅读
- c++ - 如何在没有大量重复检查代码的情况下处理函数的多个可能的早期返回
- python - 带有 r 对数刻度的径向图的刻度位置
- tsql - 使用某些链接服务器组件执行触发器
- reactjs - react-table v7 总是重新挂载?
- javascript - TypeWriter 效果没有与标题内联
- python - 在 SymPy 中反转函数
- python-3.x - Python 3 Tkinter 画布窗口绑定
- python - PySide SizeHint 和 PaintEvent 返回不同的大小
- javascript - 使用javascript在每次点击时更改按钮内的文本?
- ios - Xcode v12.5 - 打开 SQlite DB 时生成错误