python - 卡夫卡消费者:我一遍又一遍地收到相同的消息,好像没有提交我已经阅读过的内容
问题描述
这是我的来源
from kafka import KafkaConsumer
import struct as _struct
import sys
from time import sleep
consumer = KafkaConsumer(
'stringtest',
bootstrap_servers=['10.100.121.134:9092'],
enable_auto_commit=True,
auto_commit_interval_ms=1000,
group_id='my-group',
max_poll_records=1,
max_poll_interval_ms=1000,
key_deserializer=lambda x: _struct.unpack('>q', x)[0],
value_deserializer=lambda x: x.decode('utf-8'))
for message in consumer:
print("Key = " + str(message.key) + ", Value = " + message.value + ", Topic = " + message.topic)
if message.value == 'STOP':
sleep(3)
sys.exit(0)
我有一个主题中的几条消息的列表stringtest
,当我运行程序时,它会给我消息列表(其中最后一条是STOP
),然后停止。然后,当我再次运行它时,它给了我相同的消息列表。我怎样才能防止这种情况?我认为 sleep() 会给提交时间告诉 Kafka 这个消费者已经阅读了这些消息,但它不能这样工作。我错过了什么?
解决方案
推荐阅读
- c# - 如何更改视觉工作室折叠功能
- javascript - Laravel,用ajax渲染谷歌地址api
- mysql - 测试和模拟 bufio.NewReader(os.Stdin) | 读取符文()
- visual-studio-2019 - 查找文本在 Visual Studio 2019 中不再起作用
- sql - 如何在 postgresql 中为 regexp_matches 创建索引?
- php - 如何在 WooCommerce 中更改 sold_individually 产品属性
- c# - 如何从另一个线程在 PictureBox 上绘图?
- ios - 无法在 iOS 上从 Unity 运行 Vuforia 应用程序。代码签名无效
- xml - PLSQL解析GML
- android - 如何使用警报管理器将数据从片段传递到广播接收器