首页 > 解决方案 > Spring-Cloud-Streams Kafka - 如何阻止消费者

问题描述

我有一个 Spring-Cloud-Streams 客户端读取由多个分区组成的 Kakfa 主题。客户端为它读取的每条 Kafka 消息调用一个 Web 服务。如果重试几次后 web 服务不可用,我想阻止消费者从 Kafka 读取。参考之前的 Stackoverflow 问题(Spring cloud stream kafka pause/resume binders),我自动装配BindingsEndpoint并调用changeState()方法来尝试停止消费者,但日志显示消费者在changeState()之后继续从 Kafka 读取消息调用。

我正在使用 Spring Boot 版本 2.1.2.RELEASE 和 Spring Cloud 版本 Greenwich.RELEASE。spring-cloud-stream-binder-kafka 的托管版本是 2.1.0.RELEASE。我已设置属性 autoCommitOffset=true 和 autoCommitOnError=false。

以下是我的代码片段。有什么我错过的吗?changeState()的第一个输入参数应该是主题名称吗?

如果我希望消费者应用程序在 web 服务不可用时退出,我可以简单地执行 System.exit() 而无需先停止消费者吗?

@Autowired
private BindingsEndpoint bindingsEndpoint;

...
...
@StreamListener(MyInterface.INPUT)  
    public void read(@Payload MyDTO dto,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        try {

            logger.info("Processing message "+dto);
            process(dto); // this is the method that calls the webservice


        } catch (Exception e) {

            if (e instanceof IllegalStateException || e instanceof ConnectException) {

                bindingsEndpoint.changeState("my.topic.name", 
                    BindingsEndpoint.State.STOPPED);    
                // Binding<?> b = bindingsEndpoint.queryState("my.topic.name"); ==> Using topic name returns a valid Binding object                     
            }

                e.printStackTrace();
                throw (e);
  }
}

标签: spring-bootapache-kafkaspring-cloud-stream

解决方案


您可以通过使用绑定可视化和控制功能来实现这一点,您可以在其中可视化以及停止/启动/暂停/恢复绑定。

另外,你知道这System.exit()会关闭整个JVM吗?


推荐阅读