首页 > 解决方案 > 手动分配到特定分区的 Kafka 消费者在轮询时不返回任何内容

问题描述

我有一个带有 3 个分区的 Kafka 主题。
我正在尝试创建一个测试使用者以从每个分区中获取最后 N 条消息。

为此,我手动分配给每个分区,移动偏移量并进行如下轮询:

val topicPartition = TopicPartition(topic, 1) // where 1 is the number of a partition
consumer.assign(listOf(topicPartition))
consumer.seekToEnd(listOf(topicPartition))
val lastOffset = consumer.position(topicPartition)
consumer.seek(topicPartition, lastOffset - N) // lastOffset is known to be > N
val consumerRecords = consumer.poll(Duration.ofMillis(10000))

我对所有 3 个分区重复此操作。
这适用于 3 个分区中的 2 个。
令人惊讶的是,这对一个(总是相同的)分区不起作用,因此poll()总是等待给定的超时并且什么都不返回。
笔记:

consumerProps["auto.offset.reset"] = "earliest"
consumerProps["max.poll.records"] = 500
consumerProps["fetch.max.bytes"] = 50000000
consumerProps["max.partition.fetch.bytes"] = 50000000

这种行为的原因可能是什么?

标签: apache-kafkakafka-consumer-api

解决方案


推荐阅读