首页 > 解决方案 > 如何正确查找 Kafka 中的最后一个偏移量?

问题描述

我正在尝试从最后一个偏移量读取 Kafka 主题,但我无法正确执行此操作:

在属性文件中,我设置了这些:

group.id=my_group
client.id=my_client
enable.auto.commit=true
auto.offset.reset=earliest
isolation.level=read_committed

然后代码:

consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());

即使我在主题中有一些项目,我也可以看到如何从 0 打印偏移量。

在日志文件中我可以看到这个(缩短):

[Consumer clientId=my_client, groupId=my_group] 第 1 代组的已完成分配:{my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=Assignment(partitions=[mytopic-0])}

[Consumer clientId=my_client, groupId=my_group] 成功加入第 1 代组 [Consumer clientId=my_client, groupId=my_group] 通知分配者新的分配(partitions=[mytopic-0])

[Consumer clientId=my_client, groupId=my_group] 添加新分配的分区:mytopic-0

[Consumer clientId=my_client, groupId=my_group] 找不到分区 mytopic-0 的已提交偏移量

[Consumer clientId=my_client,> groupId=my_group] 将分区 mytopic-0 的偏移量重置为偏移量 0。

知道我在做什么错吗?我用 seekToBeginning() + poll() + commitSync() + seekToEnd() 尝试了一些魔法,它以某种方式工作,但我认为这应该默认工作。

标签: javaapache-kafkakafka-consumer-api

解决方案



推荐阅读