java - 如何将特定偏移量中来自kafka主题的数据消耗到特定偏移量?
问题描述
我需要将特定的偏移量消耗到特定的结束偏移量!!consumer.seek() 从特定偏移量读取数据,但我需要从偏移量检索数据到 tooffset !任何帮助将不胜感激,在此先感谢。
ConsumerRecords<String, String> records = consumer.poll(100);
if(flag) {
consumer.seek(new TopicPartition("topic-1", 0), 90);
flag = false;
}
解决方案
要从起始偏移量读取消息到结束偏移量,您首先需要使用seek()
将消费者移动到所需的起始位置,然后poll()
直到您到达所需的结束偏移量。
例如,从偏移量 100 到 200 消耗:
String topic = "test";
TopicPartition tp = new TopicPartition(topic, 0);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs)) {
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Move to the desired start offset
consumer.seek(tp, 100L);
}
});
boolean run = true;
long lastOffset = 200L;
while (run) {
ConsumerRecords<String, String> crs = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> record : crs) {
System.out.println(record);
if (record.offset() == lastOffset) {
// Reached the end offsey, stop consuming
run = false;
break;
}
}
}
}
推荐阅读
- javascript - ';' 预期的 。在函数 p5.js Javascript
- api - 商业中心:custome api getById 不起作用
- javascript - 如何连接到 git repo 并使用 nodegit 创建分支?
- c# - 用户在网站中的时区首选项并存储 DateTime.Now
- mongodb - pymongo map-reduce 无法根据查询工作
- c++ - 如何强制 CMake 忽略系统 db.h 头文件以支持 Mac 上安装的版本?
- java - getName 方法出错“无法在数组类型 String [] 上调用 getName ()”?
- javascript - JavaScript如何从数组中过滤掉一个元素
- javascript - DiscordJs 向特定的文本频道发送消息
- python - Python return 语句不会中断 for 循环