apache-kafka-streams - Spring Cloud Stream Kafka 消费者测试
问题描述
我正在尝试按照 GitHub 上的建议设置测试链接
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
--> ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "output");
log.debug(cr);
}
finally {
pf.destroy();
}
StreamProcessor 设置为
@StreamListener
@SendTo("output")
public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {
return input.map((key, value) -> new KeyValue<>(value, new WordCount(value, 10, new Date(), new Date())));
}
--> 由于@Streamprocessor 具有@SendTo("output"),因此行从不消耗我认为应该在主题“输出”上的消息
- 我希望能够测试流处理的消息。
解决方案
您需要从您output
绑定的实际主题中消费。你有配置spring.cloud.stream.bindings.output.destination
吗?这应该是您需要使用的值。如果您不设置,默认值将与绑定相同 -output
在这种情况下。
推荐阅读
- .net - 通过 Docker 运行 .Net Core 应用程序时找不到 .XML 文件
- css - 如何使两个或更多行长表适合内联引导程序 4?
- msbuild - Cakebuild 脚本使用 vs 2017 并且构建失败
- javascript - 如何在 postgresql 中更改格式日期
- graphql - Graphql `Parameter \"obj\" to Document() 必须是一个对象,得到`
- python - 如何从带有参数的python脚本运行新终端?
- python - Python - 解析串行数据并查找特定字符串
- optaplanner - Red Hat Decision Manager 7.3 OptaCloud 求解器示例不工作
- neo4j - GraphFactory 无法实例化此 Graph 实现 [class org.apache.tinkerpop.gremlin.neo4j.structure.Neo4jGraph]]
- r - 合并源中的函数和重复的列名