spring-kafka - 如何部署处于暂停模式的 kafka 消费者,直到我发出开始使用消息的信号
问题描述
我正在使用 spring-kafka 2.2.8 并试图了解是否可以选择部署处于暂停模式的 kafka 使用者,直到我发出信号开始使用消息。请建议。
我在下面的帖子中看到,我们可以暂停并启动消费者,但我需要消费者在部署时处于暂停模式。 如何使用 spring-kafka 暂停和恢复 @KafkaListener
解决方案
@KafkaListener(id = "foo", ..., autoStartup = "false")
然后KafkaListenerEndpointRegistry
在你准备好时使用
registry.getListenerContainer("foo").start();
在暂停模式下启动它没有多大意义,但你可以这样做......
@SpringBootApplication
public class So62329274Application {
public static void main(String[] args) {
SpringApplication.run(So62329274Application.class, args);
}
@KafkaListener(id = "so62329274", topics = "so62329274", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62329274").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {
return args -> {
template.send("so62329274", "foo");
registry.getListenerContainer("so62329274").pause();
registry.getListenerContainer("so62329274").start();
System.in.read();
registry.getListenerContainer("so62329274").resume();
};
}
}
分配分区时,您将看到如下日志消息:
由于重新平衡,Kafka 恢复了暂停的消费者;消费者再次暂停,因此最初的 poll() 将永远不会返回任何记录
推荐阅读
- reactjs - 从 React 中的另一个属性访问属性值
- node.js - express.js 不影响响应
- html - 为什么“溢出:隐藏;” 隐藏导航栏上方和下方的空间?
- javascript - 更改 Buefy 库 (Vue.js) 的 TagInput 元素中的默认行为?
- javascript - 知道字符串的一个单词是否在数组中 Node JS
- wordpress - WordPress Gutenberg:动态块不加载帖子列表
- azure - 如何自定义默认工作项状态的颜色
- google-chrome-extension - 使用 Chrome 扩展程序进行 Google 转化跟踪
- python-3.x - 为命令创建冷却时间
- excel - VBScript 在保存和关闭 Excel 工作簿之前结束