python - 如何为我的消费者获取所有分区的当前偏移量?
问题描述
我正在尝试获取每个可用分区的当前偏移量。根据文档,consumer.position应该可以解决问题,所以我这样尝试:
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all topics
topics = consumer.list_topics()
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
# get all offsets
x = consumer.position(partitions)
但是,结果分区中的所有偏移量x
仍然是-1001
.
如果我使用镜头或其他工具进行检查,我可以看到这个结果不正确,我正在取消的消费者组已经消费了消息并将它们提交给 Kafka。
解决方案
作为参考,这是有效的解决方案:
consumer = Consumer({
'bootstrap.servers': config.BOOTSTRAP_SERVERS,
'group.id': config.CONSUMER_GROUP,
'enable.auto.commit': False,
})
# get all topics
topics = consumer.list_topics()
# get all partitions
partitions = []
for name, meta in topics.topics.items():
for partition_id in meta.partitions.keys():
part = TopicPartition(name, partition_id)
partitions.append(part)
# get last committed offsets
partitions = consumer.committed(partitions)
显然consumer.position
不像宣传的那样工作,但consumer.committed
会返回存储的偏移量,即使消费者当前没有订阅主题/分区。
推荐阅读
- python - 无法在 Raspberry PI 4 上安装 Pip 包(adafruit_circuitpython_neopixel)
- matlab - 运行代码时数组索引 Syms 错误
- python - 根据另一个熊猫的开始日期和结束日期列的条件创建新的熊猫数据框
- php - 如何使用 Wordpress Rest API 删除 url 中的 index.php
- ubuntu-20.04 - 使用 Python 2.7 在 Ubuntu 20.04 中安装 Mapnik 2.2.0
- reactjs - 输入键不适用于反应应用程序中的选项卡/键盘可访问性
- c# - 如何首先将 EF Core 代码与 azure synapse 一起使用
- c# - 如何从 LINQ 查询中的字符串中提取数字并在数据库端排序
- ruby-on-rails - 在 Rails 中编写辅助方法
- unit-testing - 单元测试Htmx?