apache-kafka - 从 spring-cloud-stream 访问 kafka producer factory
问题描述
我有一个使用 spring-boot-cloud 和 apache-kafka 的项目,我有一个涵盖拓扑逻辑的集成测试列表,这要感谢 EmbeddedBroker。
我最近发现运行这些测试时日志中有很多噪音。
例如 [Producer clientId=producer-2] 无法建立到节点 0 (localhost/127.0.0.1:63267) 的连接。经纪人可能不可用。
经过一些试验和错误后,似乎这些是由 spring-cloud-stream 绑定创建的生产者。在类级别上以某种方式使用@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD),每次测试后它们似乎都没有被清理。
因此,我想如果我可以访问生产者工厂,那么我可以在我的测试类的 @AfterEach 子句中手动清理它们。我试图自动装配 KafkaTemplate 但它没有帮助。我不知道如何访问生产者工厂,因为它是由框架隐式创建的。
请注意,这些似乎不会影响测试结果,因为它们仅在测试阶段结束时出现。
提前致谢!
解决方案
您可以通过ProducerMessageHandlerCustomizer
这种方式添加 bean 并获取对生产者工厂的引用。
@Bean
ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler> cust() {
return (handler, dest) -> {
this.pfMap.put(dest, handler.getKafkaTemplate().getProducerFactory());
}
}
将 PF 存储在测试用例中的地图中,然后reset()
在您想要关闭生产者时将其存储。
推荐阅读
- javascript - ASP.NET 从 MVC 模型数据渲染 HTML5 画布
- javascript - 在 VueJS data() 和 v-bind: 同步表单中动态设置对象值
- mysql - (MySQLdb._exceptions.OperationalError) (1045, "Access denied for user 'DELL'@'localhost' (using password: NO)")
- java - 在转换方法 WebFlux 中从 lambda 返回 null 或可为 null 的东西
- angularjs - 使用 Python 请求模块时解析 Angular.JS
- flutter - 未处理的异常:FileSystemException:无法打开文件,路径 = 'assets/xml/strings.xml'(操作系统错误:没有这样的文件或目录,errno = 2)
- css - 如何创建一个使用带有 tailwindcss 的奇子伪选择器的类
- css - 如何在 angular primeng 中设置 p-listbox 的宽度?
- javascript - 在 Nativescript Angular 中的动画后向集合添加新项目时,ngFor 崩溃
- python - 任何人都可以使用带有字典的 for 循环来解释这个 python 语句吗?