首页 > 解决方案 > Spring Cloud Stream Kafka 消费者测试

问题描述

我正在尝试按照 GitHub 上的建议设置测试链接

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
    DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
    try {
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
        template.setDefaultTopic("words");
        template.sendDefault("foobar");

    --> ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "output");
        log.debug(cr);
    }
    finally {
        pf.destroy();
    }

StreamProcessor 设置为

@StreamListener
    @SendTo("output")
    public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {

        return input.map((key, value) -> new KeyValue<>(value, new WordCount(value, 10, new Date(), new Date())));
    }

--> 由于@Streamprocessor 具有@SendTo("output"),因此行从不消耗我认为应该在主题“输出”上的消息

标签: apache-kafka-streamsspring-cloud-stream

解决方案


您需要从您output绑定的实际主题中消费。你有配置spring.cloud.stream.bindings.output.destination吗?这应该是您需要使用的值。如果您不设置,默认值将与绑定相同 -output在这种情况下。


推荐阅读