首页 > 解决方案 > 卡夫卡消费者:我一遍又一遍地收到相同的消息,好像没有提交我已经阅读过的内容

问题描述

这是我的来源

from kafka import KafkaConsumer
import struct as _struct
import sys
from time import sleep

consumer = KafkaConsumer(
     'stringtest',
     bootstrap_servers=['10.100.121.134:9092'],
     enable_auto_commit=True,
     auto_commit_interval_ms=1000,
     group_id='my-group',
     max_poll_records=1,
     max_poll_interval_ms=1000,
     key_deserializer=lambda x: _struct.unpack('>q', x)[0],
     value_deserializer=lambda x: x.decode('utf-8'))

for message in consumer:
    print("Key = " + str(message.key) + ", Value = " + message.value + ", Topic = " + message.topic)
    if message.value == 'STOP':
        sleep(3)
        sys.exit(0)

我有一个主题中的几条消息的列表stringtest,当我运行程序时,它会给我消息列表(其中最后一条是STOP),然后停止。然后,当我再次运行它时,它给了我相同的消息列表。我怎样才能防止这种情况?我认为 sleep() 会给提交时间告诉 Kafka 这个消费者已经阅读了这些消息,但它不能这样工作。我错过了什么?

标签: pythonapache-kafkakafka-consumer-api

解决方案


推荐阅读