首页 > 解决方案 > 为什么 Kafka 的 seekToBeginning 和 seekToEnd 不能与 assign 一起使用?

问题描述

说,我想检查特定分区的 Kafka 中第一条和最后一条消息的偏移量。我的想法是将该assign(…)方法与seekToBeginning(…)and一起使用seekToEnd(…)。不幸的是,这不起作用。

如果我设置AUTO_OFFSET_RESET_CONFIG"latest"seekToBeginning(…)则没有效果;如果我将其设置为"earliest"seekToEnd(…)则不起作用。似乎对我的消费者来说唯一重要的是AUTO_OFFSET_RESET_CONFIG.

我看过一个类似的话题,但问题是处理的subscribe(),而不是assign()方法。建议的解决方案是实现ConsumerRebalanceListner并将其作为参数传递给subscribe()方法。不幸的是,该assign()方法只有一个签名,并且只能获取主题分区列表。

问题是:是否可以使用orseekToBeginning()方法。如果是,如何?如果不是,为什么?seekToEnd()assign()

我的代码的相关片段:

KafkaConsumer<String, ProtoMeasurement> consumer = createConsumer();
TopicPartition zeroP = new TopicPartition(TOPIC, 1);
List<TopicPartition> partitions = Collections.singletonList(zeroP);

consumer.assign(partitions);
consumer.poll(Duration.ofSeconds(1));
consumer.seekToBeginning(partitions);
long currOffsetPos = consumer.position(zeroP);
LOGGER.info("Current offset {}.", currOffsetPos);
ConsumerRecords<String, ProtoMeasurement> records = consumer.poll(Duration.ofMillis(100));
// ...

记录器打印偏移量 n,这是所考虑主题的最大(最新)偏移量。

标签: javaapache-kafkakafka-consumer-api

解决方案


我注意到这种行为在 MockConsumer 中有错误且不一致。文档说他们是懒惰的,但会在 position() 调用后触发。但对于 MockConsumer 来说,情况并非如此。特别是,我发现它在大约 1.0 和 2.2.2 之间适用于 MockConsumer,并且在 2.3.0 之后被破坏

取而代之的是,我选择执行以下操作,这在 MockConsumer 和真实的情况下始终有效:

// consistently working seed to beginning
consumer.beginningOffsets(partitions).forEach(consumer::seek);
// consistently working seed to end
consumer.endOffsets(partitions).forEach(consumer::seek);

如果有线程同时调用轮询,这会更危险,但在我的情况下效果很好,我只想在应用程序开始轮询时手动控制偏移位置。


推荐阅读