首页 > 解决方案 > 如何使用 Spring Kafka 测试 Kafka Streams 应用程序?

问题描述

我正在使用 Kafka Streams、Spring-Kafka 和 Spring Boot 编写流应用程序。我找不到任何信息如何在使用 Spring-Kafka 时正确测试 Kafka Streams DSL 完成的流处理。文档提到 EmbeddedKafkaBroker,但似乎没有关于如何处理例如状态存储的测试的信息。

只是为了提供一些我想测试的简单示例。我注册了以下 bean(其中 Item 是 avro 生成的):


    @Bean
    public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
        return streamsBuilder
                .stream(ITEM_TOPIC,
                        Consumed.with(Serdes.String(), itemAvroSerde))
                .mapValues((id, item) -> item.getNumber())
                .groupByKey()
                .aggregate(
                        () -> 0L,
                        (id, number, agg) -> agg + number,
                        Materialized.with(Serdes.String(), Serdes.Long()));
    }

测试所有项目编号是否汇总的正确方法是什么?

标签: spring-bootapache-kafkaapache-kafka-streamsspring-kafkaspring-kafka-test

解决方案


Spring Kafka for Kafka Streams 支持不会带来任何额外的 API,尤其是在流构建及其处理方面。

我们最近为自己打开了一个很好的kafka-streams-test-utils库,可以在没有任何 Kafka 代理启动(甚至嵌入)的情况下用于单元测试。

在我们的几个测试中,我们有这样的东西:

    KStream<String, String> stream = builder.stream(INPUT);
    stream
            .transform(() -> enricher)
            .to(OUTPUT);

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
    TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);

    ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(),
            new StringSerializer());
    driver.pipeInput(recordFactory.create(INPUT, "key", "value"));
    ProducerRecord<byte[], byte[]> result = driver.readOutput(OUTPUT);
    assertThat(result.headers().lastHeader("foo")).isNotNull();

我相信应该有一些 APITopologyTestDriver来处理提到的状态存储。


推荐阅读