首页 > 解决方案 > 从 spring-cloud-stream 访问 kafka producer factory

问题描述

我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,我有一个涵盖拓扑逻辑的集成测试列表,这要感谢 EmbeddedBroker。

我最近发现运行这些测试时日志中有很多噪音。

例如 [Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接。经纪人可能不可用。

经过一些试验和错误后,似乎这些是由 spring-cloud-stream 绑定创建的生产者。在类级别上以某种方式使用@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD),每次测试后它们似乎都没有被清理。

因此,我想如果我可以访问生产者工厂,那么我可以在我的测试类的 @AfterEach 子句中手动清理它们。我试图自动装配 KafkaTemplate 但它没有帮助。我不知道如何访问生产者工厂,因为它是由框架隐式创建的。

请注意,这些似乎不会影响测试结果,因为它们仅在测试阶段结束时出现。

提前致谢!

标签: apache-kafkaspring-cloud-stream

解决方案


您可以通过ProducerMessageHandlerCustomizer这种方式添加 bean 并获取对生产者工厂的引用。

@Bean
ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler> cust() {
    return (handler, dest) -> {
        this.pfMap.put(dest, handler.getKafkaTemplate().getProducerFactory());
    }
}

将 PF 存储在测试用例中的地图中,然后reset()在您想要关闭生产者时将其存储。


推荐阅读