spring-boot - Spring-Cloud-Streams Kafka - 如何阻止消费者
问题描述
我有一个 Spring-Cloud-Streams 客户端读取由多个分区组成的 Kakfa 主题。客户端为它读取的每条 Kafka 消息调用一个 Web 服务。如果重试几次后 web 服务不可用,我想阻止消费者从 Kafka 读取。参考之前的 Stackoverflow 问题(Spring cloud stream kafka pause/resume binders),我自动装配BindingsEndpoint并调用changeState()方法来尝试停止消费者,但日志显示消费者在changeState()之后继续从 Kafka 读取消息调用。
我正在使用 Spring Boot 版本 2.1.2.RELEASE 和 Spring Cloud 版本 Greenwich.RELEASE。spring-cloud-stream-binder-kafka 的托管版本是 2.1.0.RELEASE。我已设置属性 autoCommitOffset=true 和 autoCommitOnError=false。
以下是我的代码片段。有什么我错过的吗?changeState()的第一个输入参数应该是主题名称吗?
如果我希望消费者应用程序在 web 服务不可用时退出,我可以简单地执行 System.exit() 而无需先停止消费者吗?
@Autowired
private BindingsEndpoint bindingsEndpoint;
...
...
@StreamListener(MyInterface.INPUT)
public void read(@Payload MyDTO dto,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
try {
logger.info("Processing message "+dto);
process(dto); // this is the method that calls the webservice
} catch (Exception e) {
if (e instanceof IllegalStateException || e instanceof ConnectException) {
bindingsEndpoint.changeState("my.topic.name",
BindingsEndpoint.State.STOPPED);
// Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object
}
e.printStackTrace();
throw (e);
}
}
解决方案
您可以通过使用绑定可视化和控制功能来实现这一点,您可以在其中可视化以及停止/启动/暂停/恢复绑定。
另外,你知道这System.exit()
会关闭整个JVM吗?
推荐阅读
- c# - 通过网络复制到本地用户
- powershell - Powershell 调用 Robocopy 以获取前 32 个最大文件的总大小不起作用
- python - 我正在尝试从 python 正则表达式解析字符串-无法获得所需的输出
- python - 如何从现有的 csv 文件创建 csv 文件?
- spring-boot - kafka 什么时候会重试处理未确认的消息?
- javascript - Angular 8 - 动画图像 src 更改
- java - 无法使用 docker-compose 运行 docker 应用程序 - 显示 NoClassDefFoundError: io/dropwizard/Application
- apache-flink - Flink 的架构内部
- c++ - 构造函数初始化指针数据成员的问题
- python - 按降序计算值列表的频率及其相关的值百分比更高