首页 > 解决方案 > 如何在属于 AbstractConsumerSeekAware 类的 Spring Kafka 中实现 seekToEnd()?

问题描述

我有一个要求。我们有一个从 kafka 主题读取的 spring boot kafka 消费者应用程序。我们的要求是每当应用程序出现故障并出现时,我都想从最新的偏移量开始,而不是被旧值所困扰。是否有可能重置组的偏移量?我进行了一些研究,并使用 AbstractConsumerSeekAware 使用 seekToEnd() 将偏移设置到末尾,如下面的代码所示。

public class KafkaConsumer extends AbstractConsumerSeekAware {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;



    @KafkaListener(topics = "${topic.consumer}")
    public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
        //consuming the message from consumer Record.
        seekToEnd();
        doSomething();
        ack.acknowledge();
    }

但是,当我们停止应用程序并重新启动它时,它开始从它离开的最后一个偏移量开始读取,但我们只想从应用程序启动时的偏移量读取。我们怎样才能做到这一点?

标签: spring-bootapache-kafkaspring-kafkaspring-cloud-stream

解决方案


在方法中这样做是不正确的@KafkaListener。只有当消费者从分区传递记录时才会真正调用这个。

出于这个原因,您必须实现一个onPartitionsAssigned(),因此消费者在开始轮询它们之前会寻找这些分区。

在文档中查看更多信息:https ://docs.spring.io/spring-kafka/docs/current/reference/html/#seek


推荐阅读