首页 > 解决方案 > 如何为我的消费者获取所有分区的当前偏移量?

问题描述

我正在尝试获取每个可用分区的当前偏移量。根据文档,consumer.position应该可以解决问题,所以我这样尝试:

consumer = Consumer({
    'bootstrap.servers': config.BOOTSTRAP_SERVERS,
    'group.id': config.CONSUMER_GROUP,
    'enable.auto.commit': False,
})

# get all topics
topics = consumer.list_topics()

# get all partitions
partitions = []
for name, meta  in topics.topics.items():
    for partition_id in meta.partitions.keys():
        part = TopicPartition(name, partition_id)
        partitions.append(part)

# get all offsets
x = consumer.position(partitions)

但是,结果分区中的所有偏移量x仍然是-1001.

如果我使用镜头或其他工具进行检查,我可以看到这个结果不正确,我正在取消的消费者组已经消费了消息并将它们提交给 Kafka。

标签: pythonapache-kafkakafka-consumer-apiconfluent-platform

解决方案


作为参考,这是有效的解决方案:

consumer = Consumer({
    'bootstrap.servers': config.BOOTSTRAP_SERVERS,
    'group.id': config.CONSUMER_GROUP,
    'enable.auto.commit': False,
})

# get all topics
topics = consumer.list_topics()

# get all partitions
partitions = []
for name, meta  in topics.topics.items():
    for partition_id in meta.partitions.keys():
        part = TopicPartition(name, partition_id)
        partitions.append(part)

# get last committed offsets
partitions = consumer.committed(partitions)

显然consumer.position不像宣传的那样工作,但consumer.committed会返回存储的偏移量,即使消费者当前没有订阅主题/分区。


推荐阅读