首页 > 解决方案 > 提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员

问题描述

我正在使用带有recordFilterStrategy的spring-kafka。

@Bean("manualImmediateListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> manualImmediateListenerContainerFactory(
            ConsumerFactory<Object, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setPollTimeout(9999999);
        factory.setBatchListener(false);
        //配置手动提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(new RecordFilterStrategy<Object, Object>() {
            @Override
            public boolean filter(ConsumerRecord<Object, Object> consumerRecord) {
                Shipment shipment = (Shipment) consumerRecord.value();
                return shipment.getType().contains("YAW");
            }
        });
        return factory;
    }

在这里,我做了 factory.setAckDiscarded(true)。当它收到应该丢弃的消息时。它将尝试确认丢弃的消息。然后它会得到一个如下所示的异常。我已经增加了 max.poll.interval.ms 并减少了批次的最大大小。任何提示将不胜感激!org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为该组已经重新平衡并将分区分配给另一个成员。这意味着后续调用 poll() 之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息。您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 返回的批次的最大大小来解决这个问题。

标签: spring-bootapache-kafkaspring-kafka

解决方案


我在kafka控制台中注意到了。它一直在为重新平衡做准备。基本我认为这个问题是由 kafka 代理引起的,除了 Spring 应用程序代码有问题外,它不稳定。


推荐阅读