首页 > 解决方案 > 如何使用具有相同组 ID 的 2 个消费者消费消息?

问题描述

topic当使用 same和 same时,我希望能够从 2 个不同的消费者(每个消费者将收到不同的消息)消费一次数据group-id

制作人:

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                             value_serializer=lambda x:
                             dumps(x).encode('utf-8'))
    for e in range(50):
        data = {'number' : e}
        print('Producer {}'.format(data))
        producer.send('test', value=data)
        sleep(2)

第一个消费者代码:

consumer = KafkaConsumer(
        'test',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        #auto_commit_interval_ms=100,
        group_id='my-group',
        value_deserializer=lambda x: loads(x.decode('utf-8')))

for message in consumer:
    message = message.value
    print('[1] Consume {}'.format(message))
    sleep(3)

第二个消费者代码:

consumer = KafkaConsumer(
        'test',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        #auto_commit_interval_ms=100,
        group_id='my-group',
        value_deserializer=lambda x: loads(x.decode('utf-8')))

 for message in consumer:
        message = message.value
        print('[2] Consume {}'.format(message))
        sleep(5)

我希望看到一些消息被消费,consumer-1而其他消息被消费consumer-2(根据consumer代码中的 sleep 命令)

但似乎只有一个消费者在工作并获取所有消息。(第一个消费者卡住了,第二个消费者得到消息)。

我错过了什么?

标签: pythonapache-kafka

解决方案


根据问题的描述,我假设您的主题只有一个分区。

如果您想在一个组中运行多个消费者,您的主题需要多个分区


推荐阅读