java - 将所有分区设置为它们在 Kafka 中的最新偏移量,但得到“没有当前分区分配”
问题描述
我正在尝试将所有分区设置为 Kafka 中的最新偏移量,但出现此错误:
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition idp-sanitizer-qa-test-8
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:256)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.requestOffsetReset(SubscriptionState.java:330)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1593)
at com.company.data.process.sanitizer.e2eframework.kafka.Consumer.seekToEnd(Consumer.java:88)
at com.company.data.process.sanitizer.e2eframework.TestRunner.main(TestRunner.java:43)
我想将所有分区设置为它们的最新偏移量,因为我不关心尚未使用的数据。但是当我重新运行我的程序时,我会关心它。
这是我正在运行的方法,但我收到了上面的错误。
public void seekToEnd() {
logger.info("Setting Kafka Consumer to latest offset...");
List<PartitionInfo> partitionInfoList = this.kafkaConsumer.partitionsFor(this.topic);
ArrayList<TopicPartition> partitions = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfoList) {
partitions.add(new TopicPartition(this.topic, partitionInfo.partition()));
}
this.kafkaConsumer.seekToEnd(partitions);
}
在我的构造函数中,我已经订阅了一个主题,所以this.kafkaConsumer
应该订阅一个主题
this.kafkaConsumer.subscribe(Collections.singletonList(this.topic));
解决方案
推荐阅读
- jquery-select2 - 当我使用 select2 时,谷歌浏览器中的页面加载速度很慢。在 IE11 和 Firefox 中运行良好
- sql-server - 如果产品在不同地区的不同时间推出,如何获取最近一季度推出的产品列表
- python - 将图像列表重塑为 CNN 的正确格式
- php - 如何使用当前站点 cookie 登录到另一个站点,然后重定向到另一个页面并发出 CURL 请求?
- php - 如何显示数组中的多个特定列并显示为一个字符串
- javascript - 如果使用 await,setTimeout 的同步使用会停止代码吗?
- c - 数组声明没有大小和 const volatile
- vba - 如何让我的 VBA A* 寻路算法返回对角线路径?
- 3d - ARKit + SceneKit:光照快速增加多边形数量
- c# - 服务如何将通用 JsonPatchDocument<> 转移到另一个服务?