apache-kafka - 如何在 Micronaut 中实现 HTTP 端点以暂停消费来自 Kafka 的消息?
问题描述
让我描述一下我的问题背后的理由:
- 我们有一个基于 Micronaut 的应用程序使用来自 Kafka 代理的消息。
- 消费的消息被处理并馈送到另一个远程“下游”应用程序。
- 如果此下游应用程序有意重新启动,则需要一段时间才能准备好接受来自我们基于 Micronaut 的应用程序的进一步消息。
所以我们的想法是向 Micronaut 应用程序发送一个请求,以暂停/暂停使用来自 Kafka 的消息(例如,通过 HTTP 到适当的端点)。KafkaConsumer 接口似乎有适当的方法来实现这个目标,比如
public void pause(java.util.Collection<TopicPartition> partitions)
public void resume(java.util.Collection<TopicPartition> partitions)
但是如何获取对输入到我们的 HTTP 端点的适当 KafkaConsumer 实例的引用?
我们试图将它注入到 HTTP 端点/控制器类的构造函数中,但这会产生
Error instantiating bean of type [HttpController]
Message: Missing bean arguments for type: org.apache.kafka.clients.consumer.KafkaConsumer. Requires arguments: AbstractKafkaConsumerConfiguration consumerConfiguration
可以使用 @Topic 注释的接收方法获取 KafkaConsumer 实例的引用作为方法参数,如Micronaut Kafka 文档中所述,但这会导致将此引用存储为实例变量,让 HTTP 端点访问它,等等。 ...这听起来不太令人信服:只有在收到下一条消息时,您才会获得对 KafkaConsumer 的引用!这可能适合暂停/暂停,但不适用于恢复!
顺便说一句,在保存为实例变量的引用上调用 KafkaConsumer.resume(...) 会产生
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2201)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2185)
at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1842)
[...]
我认为在实现 KafkaConsumerAware 接口来存储新创建的 KafkaConsumer 实例的引用时也是如此。
那么有什么想法如何以适当的方式处理这个问题吗?
谢谢
基督教
解决方案
推荐阅读
- laravel - 在 laravel 中查询与 carbon 的数据库关系
- javascript - 在 JSON 中隔离具有相似属性的对象
- python - 如何将每个 numpy 列中的所有非零元素分配给大小与列数相同的数组中的值?
- aws-lambda - Lambda/NodeJS 调试器不使用 Cloud9/AWS SAM 环境变量
- python-3.x - 从 pandas xlsxwriter 打开 Excel (XLSX) 文件时出错
- c - 这里的“tempPtr”不是多余的吗?(链接列表)
- java - 无法在 Spring Boot Maven 项目中集成外部 jar
- flutter - 如何在 Flutter 中制作具有浮雕效果的按钮
- c++ - 使用 mem_fn 代替 mem_fun
- django - Heroku 应用程序在使用新依赖项推送更改后崩溃