首页 > 解决方案 > 如何在 Micronaut 中实现 HTTP 端点以暂停消费来自 Kafka 的消息?

问题描述

让我描述一下我的问题背后的理由:

所以我们的想法是向 Micronaut 应用程序发送一个请求,以暂停/暂停使用来自 Kafka 的消息(例如,通过 HTTP 到适当的端点)。KafkaConsumer 接口似乎有适当的方法来实现这个目标,比如

public void pause​(java.util.Collection<TopicPartition> partitions)
public void resume​(java.util.Collection<TopicPartition> partitions)

但是如何获取对输入到我们的 HTTP 端点的适当 KafkaConsumer 实例的引用?

我们试图将它注入到 HTTP 端点/控制器类的构造函数中,但这会产生

Error instantiating bean of type  [HttpController]
Message: Missing bean arguments for type: org.apache.kafka.clients.consumer.KafkaConsumer. Requires arguments: AbstractKafkaConsumerConfiguration consumerConfiguration

可以使用 @Topic 注释的接收方法获取 KafkaConsumer 实例的引用作为方法参数,如Micronaut Kafka 文档中所述,但这会导致将此引用存储为实例变量,让 HTTP 端点访问它,等等。 ...这听起来不太令人信服:只有在收到下一条消息时,您才会获得对 KafkaConsumer 的引用!这可能适合暂停/暂停,但不适用于恢复!

顺便说一句,在保存为实例变量的引用上调用 KafkaConsumer.resume(...) 会产生

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2201)
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2185)
    at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1842)
    [...]

我认为在实现 KafkaConsumerAware 接口来存储新创建的 KafkaConsumer 实例的引用时也是如此。

那么有什么想法如何以适当的方式处理这个问题吗?

谢谢

基督教

标签: apache-kafkakafka-consumer-apimicronaut

解决方案


推荐阅读