首页 > 解决方案 > 如何优雅地关闭正在运行的 Kafka Consumer

问题描述

我需要根据一些数据库驱动的属性来打开/关闭 Kafka 消费者。怎么可能实现。

我想到的一种方法是:当消费者标志关闭时,从消费者那里抛出异常。容器工厂配置定义为

factory.setErrorHandler(new SeekToCurrentErrorHandler());

但它积极寻求同样的信息。

有什么办法可以关闭心跳,然后根据需要重新打开。

标签: apache-kafkakafka-consumer-apispring-kafka

解决方案


你可以stop()start()监听器容器。

看来您正在使用@KafkaListener,因为您使用的是容器工厂。

在这种情况下

@KafkaListener(id = "foo" ...)

然后使用KafkaListenerEndpointRegistry豆...

registry.getListenerContainer("foo").stop();

推荐阅读