pykafka - confluent-kafka-python 库:每个消费者组每个主题的读取偏移量
问题描述
由于pykafka EOL,我们正在迁移到confluent-kafka-python。因为pykafka
我们编写了一个详细的脚本,它以以下格式生成输出:
话题 | 消费群体 | 抵消 |
---|---|---|
topic_alpha | total_messages | 100 |
topic_alpha | 消费者_a | 10 |
topic_alpha | 消费者_b | 25 |
我想知道是否有一个 Python 代码知道如何为confluent-kafka-python
?
小字:有一个关于如何读取每个给定 consumer_group 的偏移量的部分示例。但是,我很难在consumer_group
不手动解析的情况下获取每个主题的列表__consumer_offsets
。
解决方案
用于admin_client.list_groups()
获取组列表,并admin_client.list_topics()
获取集群中和client.get_watermark_offsets()
给定主题的所有主题和分区。
然后为每个消费者组实例化一个具有相应的新消费者group.id
,创建一个 TopicPartition 列表来查询已提交的偏移量,然后调用c.committed()
以检索已提交的偏移量。从高水位线中减去承诺的偏移量得到
推荐阅读
- python - 如何从另一个文件中按值更改csv列中的值
- angular - 如果 'app-fixed-topbar' 是一个 Angular 组件,那么验证它是这个模块的一部分
- email-verification - 我无法通过 wp-admin 访问我的站点管理仪表板
- angular6 - 在angular6项目中没有弹出窗口
- cmake - Visual Studio 中的显式类型转换
- excel - 如何对我的 Excel VBA 代码使用错误处理
- deep-learning - 此时将新数据添加到训练集中不会提高训练准确性
- jquery - 当我使用 jquery-ui 自动完成在文本区域中键入时,如何在任何位置实现自动完成?
- angular - @ViewChild() 来自另一个组件
- java - Thymeleaf 3 语法迭代列表
- >