首页 > 解决方案 > Kafka恢复消费者无法收到第一条消息

问题描述

我需要暂停 Kafka 消费者从主题中消费消息,直到消息到达它的等待时间。为此,我在 Kafka 中使用了暂停/恢复方法。但是当我恢复时,暂停前消耗的第一条消息将不会再次收到。但是,自从我手动确认后,主题的偏移量仍然没有更新(滞后是一个)。

@StreamListener(ChannelName.MESSAGE_INPUT_RETRY_CHANNEL)
public void onMessageRetryReceive(org.springframework.messaging.Message<Message> message, @Header(KafkaHeaders.CONSUMER)KafkaConsumer<?,?> consumer){

    long waitTime = //Calculate the wait time of the message
    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
    if(waitTime > 0){
        consumer.pause(Collections.singleton(new TopicPartition("message-retry-topic",0)));
    }else{
        messageProducer.sendMessage(message.getPayload());
        acknowledgment.acknowledge();
    }
}


@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
    return event -> {
        boolean isReady = //Logi to check if ready to resume
        if(isReady){
            event.getConsumer().resume(event.getConsumer().paused());
        }
    };
}

这涉及到KafkaConsumer resume partition cannot continue to receive uncommitted messages中提到的问题。但我不确定 seeks 方法如何有助于检索第一条消费消息。我正在使用弹簧云流。我需要一些建议

标签: apache-kafkaspring-cloud-stream

解决方案


您不调用的事实acknowledgment.acknowledge();并不意味着您的KafkaConsumer实例不会保留内存中最后使用的位置。

我们肯定需要为分区上的后续消费者提交偏移量。当前运行的消费者不需要提交此类信息,因为它具有自己的内存状态。

为了能够重新使用相同的记录,您需要执行seek()操作。有关更多信息,请参阅文档:https ://docs.spring.io/spring-kafka/docs/current/reference/html/#seek


推荐阅读