首页 > 解决方案 > 在 kafka-python 库的 KafkaConsumer 中限制要消费的消息数量

问题描述

示例代码:

consumer = KafkaConsumer(config["kafka"]["input"],
                         bootstrap_servers=config["kafka"]["brokers"].split(','),
                         value_deserializer=lambda m: json.loads(m.decode('ascii')),
                         enable_auto_commit=config["kafka"]["auto_commit"],
                         auto_commit_interval_ms=config["kafka"]["commit_interval"],
                         group_id=config["kafka"]["group"],
                         consumer_timeout_ms=config["kafka"]["timeout"]
                        )

尝试了 max_poll_records、fetch_max_bytes 甚至 consumer.poll() 方法都没有奏效。

标签: kafka-consumer-apikafka-python

解决方案


Confluent-Kafka python 库允许限制消息的数量。

利用:

consumer.consume(num_messages=config["kafka"]["number_of_messages_to_consumer"], timeout=config["kafka"]["timeout"])限制消息的消耗。

Kafka-Python 库不支持这个。


推荐阅读