首页 > 解决方案 > 处理消息后关闭 kafka 消费者

问题描述

我正在使用@KafkaListener(topics = "${topic}")来自 spring-boot 应用程序中的主题的消息,并且我需要定期运行它。spring-kafka 版本是 2.2.4.RELEASE。

实现此目的的一种方法可能是每 6 小时使用 批处理一次fetch.max.wait.ms,但对于此配置而言,6 小时似乎太多了。

因此,我正在寻找一种在处理后关闭应用程序并每 6 小时重新启动一次的方法。

其他方式如下所示,但它不能保证应用程序已在睡眠时间内完成处理(下例中为 30 秒)。

public class Application {
    public static void main(String[] args) throws InterruptedException {
        ConfigurableApplicationContext run = SpringApplication.run(Application.class, args);
        Thread.sleep(30000);
        run.close();
    }
}

关闭消费者的优雅方式是什么,以确保仅在处理完一批消息后才会关闭?

标签: spring-kafka

解决方案


看到这个答案

当所有容器实例都空闲时关闭应用程序。


推荐阅读