python - 如何使用具有相同组 ID 的 2 个消费者消费消息?
问题描述
topic
当使用 same和 same时,我希望能够从 2 个不同的消费者(每个消费者将收到不同的消息)消费一次数据group-id
。
制作人:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
for e in range(50):
data = {'number' : e}
print('Producer {}'.format(data))
producer.send('test', value=data)
sleep(2)
第一个消费者代码:
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
#auto_commit_interval_ms=100,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
print('[1] Consume {}'.format(message))
sleep(3)
第二个消费者代码:
consumer = KafkaConsumer(
'test',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
#auto_commit_interval_ms=100,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
message = message.value
print('[2] Consume {}'.format(message))
sleep(5)
我希望看到一些消息被消费,consumer-1
而其他消息被消费consumer-2
(根据consumer
代码中的 sleep 命令)
但似乎只有一个消费者在工作并获取所有消息。(第一个消费者卡住了,第二个消费者得到消息)。
我错过了什么?
解决方案
根据问题的描述,我假设您的主题只有一个分区。
如果您想在一个组中运行多个消费者,您的主题需要多个分区
推荐阅读
- python - 在来自列表的 URL inl Python 末尾的循环中添加值并附加输出
- primefaces - Primefaces DataTable - 回流不适用于 iOS
- python - 如何在两条曲线之间插值以获得python中的数据映射
- mysql - dbeaver 图标丢失 Ubuntu 20.04
- c# - POST Json Web API C#
- cypress - 赛普拉斯如何存储输入数据并解析以进行验证
- python - OGB 数据集,我无法从“ogb.nodeproppred”导入“PygNodePropPredDataset”
- python - Vscode 使用 jedi + pyright 时两次显示函数文档字符串
- visual-studio-code - 如何配置 VS Code 扩展,以便启用或禁用它们。取决于打开的工作区?
- angular - Angular:如何在 Typescript 中访问 HTML 标签类