java - 如何正确查找 Kafka 中的最后一个偏移量?
问题描述
我正在尝试从最后一个偏移量读取 Kafka 主题,但我无法正确执行此操作:
在属性文件中,我设置了这些:
group.id=my_group
client.id=my_client
enable.auto.commit=true
auto.offset.reset=earliest
isolation.level=read_committed
然后代码:
consumer = new KafkaConsumer<>(props); // read properties
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
consumerRecords.forEach(consumerRecord -> System.out.println(consumerRecord.offset());
即使我在主题中有一些项目,我也可以看到如何从 0 打印偏移量。
在日志文件中我可以看到这个(缩短):
[Consumer clientId=my_client, groupId=my_group] 第 1 代组的已完成分配:{my_client-f1678be7-ce6b-48e8-acf2-741ab28f7266=Assignment(partitions=[mytopic-0])}
[Consumer clientId=my_client, groupId=my_group] 成功加入第 1 代组 [Consumer clientId=my_client, groupId=my_group] 通知分配者新的分配(partitions=[mytopic-0])
[Consumer clientId=my_client, groupId=my_group] 添加新分配的分区:mytopic-0
[Consumer clientId=my_client, groupId=my_group] 找不到分区 mytopic-0 的已提交偏移量
[Consumer clientId=my_client,> groupId=my_group] 将分区 mytopic-0 的偏移量重置为偏移量 0。
知道我在做什么错吗?我用 seekToBeginning() + poll() + commitSync() + seekToEnd() 尝试了一些魔法,它以某种方式工作,但我认为这应该默认工作。
解决方案
auto.offset.reset=最新的将解决你的问题
https://docs.confluent.io/platform/current/clients/consumer.html#offset-management
推荐阅读
- python - 如何制作蒙版以将除文本之外的所有图像背景设置为白色?
- ios - 原生 Swift 应用程序中的视频播放和加速度计都需要 WKWebView 的哪些权限?
- javascript - QWebEngineView 仅在 javascript 脚本完成后显示页面
- ontology - 描述逻辑中存在量词的使用
- json - Watson 训练数据格式
- selenium - 如何处理“ng-model”属性以在网页上查找元素?
- amazon-web-services - 如何使用 lambda 服务在 s3 中查找两个文本文件的差异
- python - 仅使用一个辅助数组来实现归并排序(而不是递归地创建新数组)
- ios - Xcode 将 IOS 应用程序作为其他项目分发
- c# - 上传文件并传递模型数据