python - 如果将group_id设置为None,Kafka消费者会收到消息,但如果不是None,它不会收到任何消息?
问题描述
我有以下 Kafka 消费者,如果将其分配group_id
给 None 效果很好 - 它接收到所有历史消息和我新测试的消息。
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=enable_auto_commit,
group_id=group_id,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for m in consumer:
group_id
但是,如果我将 设置为某个值,它不会收到任何东西。我试图运行测试生产者来发送新消息,但没有收到任何消息。
消费者控制台确实显示以下消息:
2020-11-07 00:56:01 INFO ThreadPoolExecutor-0_0 base.py(重新)加入组 my_group 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 base.py 成功加入组 my_group 与第 497 代 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 subscription_state.py 更新了分区分配:[] 2020-11-07 00:56:07 INFO ThreadPoolExecutor-0_0 consumer.py 为组 my_group 设置新分配的分区 set()
解决方案
一个topic的一个partition只能被同一个ConsumerGroup中的一个consumer消费。
如果您不设置 group.id,KafkaConsumer 将为您生成一个新的随机 group.id。由于此 group.id 是唯一的,您将看到正在使用数据。
如果您有多个使用相同 group.id 运行的消费者,则只有一个消费者会读取数据,而另一个则保持空闲状态,不消耗任何东西。
推荐阅读
- php - 将数据从 PHP 传递到 Bootstrap 模式
- python-3.x - 如何解决 JAX/Python 中的 ValueError `vector::reserve`?
- javascript - SVG 动画不适用于静态文件夹/包裹
- c++ - constexpr 函数说明
- sql - SQL,ORACLE - 创建一个视图并在选择中传递用户的授权
- python - 如何为 pandas 数据框的每一行打印索引值、列名和列数据?
- logstash-grok - Logstash 中的 Grok 过滤器问题
- python - Django 使用 ImageField 下载图像
- javascript - 从contact.js 发送数据到表单process.php
- math - 如何将 0..1 输入转换为双曲线 0..1 输出