首页 > 解决方案 > 将所有分区设置为它们在 Kafka 中的最新偏移量,但得到“没有当前分区分配”

问题描述

我正在尝试将所有分区设置为 Kafka 中的最新偏移量,但出现此错误:

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition idp-sanitizer-qa-test-8
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:256)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:330)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1593)
    at com.company.data.process.sanitizer.e2eframework.kafka.Consumer.seekToEnd(Consumer.java:88)
    at com.company.data.process.sanitizer.e2eframework.TestRunner.main(TestRunner.java:43)

我想将所有分区设置为它们的最新偏移量,因为我不关心尚未使用的数据。但是当我重新运行我的程序时,我会关心它。

这是我正在运行的方法,但我收到了上面的错误。

public void seekToEnd() {
        logger.info("Setting Kafka Consumer to latest offset...");
        List<PartitionInfo> partitionInfoList = this.kafkaConsumer.partitionsFor(this.topic);
        ArrayList<TopicPartition> partitions =  new ArrayList<>();
        for (PartitionInfo partitionInfo : partitionInfoList) {
            partitions.add(new TopicPartition(this.topic, partitionInfo.partition()));
        }
        this.kafkaConsumer.seekToEnd(partitions);
    }

在我的构造函数中,我已经订阅了一个主题,所以this.kafkaConsumer应该订阅一个主题

this.kafkaConsumer.subscribe(Collections.singletonList(this.topic));

标签: javaapache-kafka

解决方案


推荐阅读