apache-kafka - Spring Cloud Stream Kafka Binder autoCommitOnError=false 得到意外行为
问题描述
我使用的是 Spring Boot 2.1.1.RELEASE 和 Spring Cloud Greenwich.RC2,spring-cloud-stream-binder-kafka 的托管版本是 2.1.0RC4。Kafka 版本是 1.1.0。我设置了以下属性,因为如果出现错误,则不应使用消息。
spring.cloud.stream.bindings.input.group=consumer-gp-1
...
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=false
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=false
spring.cloud.stream.bindings.input.consumer.max-attempts=3
spring.cloud.stream.bindings.input.consumer.back-off-initial-interval=1000
spring.cloud.stream.bindings.input.consumer.back-off-max-interval=3000
spring.cloud.stream.bindings.input.consumer.back-off-multiplier=2.0
....
Kafka 主题中有 20 个分区,并且使用 Kerberos 进行身份验证(不确定这是否相关)。
Kafka 消费者为它处理的每条消息调用一个 Web 服务,如果 Web 服务不可用,那么我希望消费者会在继续处理下一条消息之前尝试处理该消息 3 次。因此,在我的测试中,我禁用了 Web 服务,因此无法正确处理任何消息。从日志中我可以看到这种情况正在发生。
过了一会儿,我停了下来,然后重新启动了 Kafka 消费者(网络服务仍然被禁用)。我期待在重新启动 Kafka 消费者后,它会尝试处理第一次未成功处理的消息。从 Kafka 消费者重启后的日志(我打印出每条消息及其字段)中,我看不到这种情况发生。我认为分区可能会影响某些东西,但我检查了日志,所有 20 个分区都分配给了这个单一的消费者。
有没有我错过的财产?我认为第二次重新启动消费者时的预期行为是,Kafka 代理会将未成功处理的记录再次传递给消费者。
谢谢
解决方案
参数按预期工作。见评论。
推荐阅读
- python - sqlite3.ProgrammingError:提供的绑定数量不正确。当前语句使用 1,并且提供了 0
- powershell - 问题:为什么Wait-Event不等待事件?
- python - 获取 ValueError:使用 PyInstaller 模块时找不到脚本“/src/add2vals.py”
- python - 为什么我的代码输出
- ubuntu - OpenMPI 挂起并显示“未指定协议”
- git - 如何将我的分叉回购返回到原始回购
- python - 从数据框中列的值创建一个系列
- r - R-从具有给定标题的 URL 列表中保存图像
- arrays - 为什么我不能在 Perl 中将项目推送到数组的散列中?
- html - 基于滚动显示/隐藏元素