首页 > 解决方案 > Spring Cloud Stream Kafka Binder autoCommitOnError=false 得到意外行为

问题描述

我使用的是 Spring Boot 2.1.1.RELEASE 和 Spring Cloud Greenwich.RC2,spring-cloud-stream-binder-kafka 的托管版本是 2.1.0RC4。Kafka 版本是 1.1.0。我设置了以下属性,因为如果出现错误,则不应使用消息。

spring.cloud.stream.bindings.input.group=consumer-gp-1
...
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=false
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=false
spring.cloud.stream.bindings.input.consumer.max-attempts=3
spring.cloud.stream.bindings.input.consumer.back-off-initial-interval=1000  
spring.cloud.stream.bindings.input.consumer.back-off-max-interval=3000 
spring.cloud.stream.bindings.input.consumer.back-off-multiplier=2.0  
.... 

Kafka 主题中有 20 个分区,并且使用 Kerberos 进行身份验证(不确定这是否相关)。

Kafka 消费者为它处理的每条消息调用一个 Web 服务,如果 Web 服务不可用,那么我希望消费者会在继续处理下一条消息之前尝试处理该消息 3 次。因此,在我的测试中,我禁用了 Web 服务,因此无法正确处理任何消息。从日志中我可以看到这种情况正在发生。

过了一会儿,我停了下来,然后重新启动了 Kafka 消费者(网络服务仍然被禁用)。我期待在重新启动 Kafka 消费者后,它会尝试处理第一次未成功处理的消息。从 Kafka 消费者重启后的日志(我打印出每条消息及其字段)中,我看不到这种情况发生。我认为分区可能会影响某些东西,但我检查了日志,所有 20 个分区都分配给了这个单一的消费者。

有没有我错过的财产?我认为第二次重新启动消费者时的预期行为是,Kafka 代理会将未成功处理的记录再次传递给消费者。

谢谢

标签: apache-kafkastreamspring-cloud-stream

解决方案


参数按预期工作。见评论。


推荐阅读