spring-kafka - Spring Kafka Consumer Retry 具有长时间的退避间隔,给出“org.apache.kafka.clients.consumer.CommitFailedException”
问题描述
我是 Spring-Kafka 的新手,并尝试在使用 Spring Kafka RetryTemplate 处理 kafka 消息期间发生故障或任何异常时实现重试。
我使用了以下代码:
//这是KafkaListenerContainerFactory:
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(retryContext -> {
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
logger.info("Recovery is called for message {} ", consumerRecord.value());
return Optional.empty();
});
return factory;
}
// 重试模板
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
// Todo: take from config
fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
// Todo: take from config
simpleRetryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
return retryTemplate;
}
//
这是消费者工厂
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
发生任何异常时,将按照重试策略按预期重试。一次,max retries用尽,它调用recovery回调方法。 但在那之后不久,它给出了“java.lang.IllegalStateException:这个错误处理程序无法处理'org.apache.kafka.clients.consumer.CommitFailedException's;没有可用的记录信息”,其中包含一些细节,例如: Failing OffsetCommit request since the consumer is不属于活动组。
似乎无法提交偏移量,因为消费者现在已从组中退出,因为它在下一次轮询之前长时间空闲(backoffperiod *(maxretry-1))。
我需要添加一些大值的 max.poll.interval.ms 吗?
有没有其他方法可以实现这一点,这样即使消费者在处理过程中花费了这么多时间并且计划以较长的时间间隔重试,也不会出现此提交失败错误。
请帮助我。
解决方案
总的 backOff 延迟必须小于max.poll.interval.ms
以避免重新平衡。
现在首选使用 aSeekToCurrentErrorHandler
而不是 a,RetryTemplate
因为这样只有每个延迟(而不是聚合)需要小于max.poll.interval.ms
推荐阅读
- javascript - 当我尝试在javascript中仅清空“完全填充”的行时,为什么我的板的行会颠倒
- c# - 创建 PowerPoint 图表时额外打开 Excel 应用程序
- javascript - 我无法使用 material-ui 中的自动完成功能
- ansible - Ansible差异过滤器未从具有相同名称的host_vars读取2个参数
- javascript - Eclipse 2021-03:JavaScript 格式化程序设置不起作用
- c# - 列出 Azure 表值
- swift - 当我在表格视图中快速下拉时,页面刷新不起作用
- codeigniter - 如何使用 Codeigniter ImageMagick 一次放置多个水印文本
- c# - 因此,如果在 _SecondaryContext.userdata_primary.InsertOneAsync(response); 之前使用 await 会有任何不同。然后 savechanges 将数据保存在 mongo 中?
- python - 使用 webp 和 jpeg 图像具有不同数量的通道