spring-boot - 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员
问题描述
我正在使用带有recordFilterStrategy的spring-kafka。
@Bean("manualImmediateListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> manualImmediateListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setPollTimeout(9999999);
factory.setBatchListener(false);
//配置手动提交offset
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setAckDiscarded(true);
factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
@Override
public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
Shipment shipment = (Shipment) consumerRecord.value();
return shipment.getType().contains("YAW");
}
});
return factory;
}
在这里,我做了 factory.setAckDiscarded(true)。当它收到应该丢弃的消息时。它将尝试确认丢弃的消息。然后它会得到一个如下所示的异常。我已经增加了 max.poll.interval.ms 并减少了批次的最大大小。任何提示将不胜感激!org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 返回的批次的最大大小来解决这个问题。
解决方案
我在kafka控制台中注意到了。它一直在为重新平衡做准备。基本我认为这个问题是由 kafka 代理引起的,除了 Spring 应用程序代码有问题外,它不稳定。
推荐阅读
- vue.js - 如何在 vue 3-rc.3 中的 .vue 文件中使用 HAML
- ruby-on-rails - 运行 rspec request spec 找不到存在的记录
- java - java.security.cert.CertPathValidatorException:Web 服务调用时出现证书链接错误
- php - HTML mail class name changes
- r - R -- lapply() Functions Not Creating a Readable Output
- r - readr and write_csv: double precision numbers and grisu3
- html - 如何在带有液体变量的html中使用填充突出显示文本
- makefile - 多依赖makefile目标
- python - 使用 UPDATE 时如何防止 SQL 数据库被锁定?
- django - 自动完成 Django 表单 jQuery