首页 > 解决方案 > 由于组重新平衡,Spring Cloud Stream Kafka 提交失败

问题描述

我有CommitFailedException一些耗时的 Spring Cloud Stream 应用程序。我知道要解决此问题,我需要设置max.poll.recordsmax.poll.interval.ms满足我对处理批次所需时间的期望。但是,我不太确定如何在 Spring Cloud Stream 中为消费者设置它。

例外:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:808) at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:691) at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1416) at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1377) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1554) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1418) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:739) at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.lang.Thread.run(Thread.java:748)

而且,我怎样才能确保这种情况根本不会发生?或者,在这种异常的情况下,我如何注入某种回滚?原因是我正在做一些其他的外部工作,一旦完成,我就会相应地发布输出消息。因此,如果在外部系统上完成工作后由于任何问题无法发布消息,我必须将其还原(通过 Kafka 发布和其他外部系统进行某种原子事务)。

标签: apache-kafkaspring-kafkaspring-cloud-stream

解决方案


您可以在此处的活页夹级别文档中设置任意 Kafka 属性

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性,这里也允许未知的消费者属性。此处的属性取代了在引导和上面的配置属性中设置的任何属性。

默认值:空地图。

例如spring.cloud.stream.kafka.binder.consumerProperties.max.poll.records=10

或者在这里的绑定级别文档

spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration

使用包含通用 Kafka 消费者属性的键/值对映射。除了拥有 Kafka 消费者属性外,这里还可以传递其他配置属性。例如应用程序需要的一些属性,例如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。

默认值:空地图。

例如spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=10

OffsetCommitCallback您可以通过向侦听器容器添加ContainerProperties并设置syncCommits为来获得提交失败的通知false。要自定义容器及其属性,请将ListenerContainerCustomizerbean 添加到应用程序。

编辑

异步提交回调...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {

    public static void main(String[] args) {
        SpringApplication.run(So57970152Application.class, args);
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
        return (container, dest, group) -> {
            container.getContainerProperties().setAckMode(AckMode.RECORD);
            container.getContainerProperties().setSyncCommits(false);
            container.getContainerProperties().setCommitCallback((map, ex) -> {
                if (ex == null) {
                    System.out.println("Successful commit for " + map);
                }
                else {
                    System.out.println("Commit failed for " + map + ": " + ex.getMessage());
                }
            });
            container.getContainerProperties().setClientId("so57970152");
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("input", "foo".getBytes());
        };
    }

}

手动提交(同步)...

@SpringBootApplication
@EnableBinding(Sink.class)
public class So57970152Application {

    public static void main(String[] args) {
        SpringApplication.run(So57970152Application.class, args);
    }

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer() {
        return (container, dest, group) -> {
            container.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
            container.getContainerProperties().setClientId("so57970152");
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment ack) {
        System.out.println(in);
        try {
            ack.acknowledge(); // MUST USE MANUAL_IMMEDIATE for this to work.
            System.out.println("Commit successful");
        }
        catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("input", "foo".getBytes());
        };
    }

}

推荐阅读