kafka-consumer-api - 在 kafka-python 库的 KafkaConsumer 中限制要消费的消息数量
问题描述
示例代码:
consumer = KafkaConsumer(config["kafka"]["input"],
bootstrap_servers=config["kafka"]["brokers"].split(','),
value_deserializer=lambda m: json.loads(m.decode('ascii')),
enable_auto_commit=config["kafka"]["auto_commit"],
auto_commit_interval_ms=config["kafka"]["commit_interval"],
group_id=config["kafka"]["group"],
consumer_timeout_ms=config["kafka"]["timeout"]
)
尝试了 max_poll_records、fetch_max_bytes 甚至 consumer.poll() 方法都没有奏效。
解决方案
Confluent-Kafka python 库允许限制消息的数量。
利用:
consumer.consume(num_messages=config["kafka"]["number_of_messages_to_consumer"], timeout=config["kafka"]["timeout"])
限制消息的消耗。
Kafka-Python 库不支持这个。
推荐阅读
- vba - 正在以符号格式复制 PDF 文本
- python - 在 SPARK python 中用空格替换双引号
- android - 为什么 Firebase App Distribution 声称新的 Android 应用版本已降级?
- linux - 模拟 docker 镜像的 hdd
- python - KeyError: '"param1"' 创建字典字符串时
- sanic - 多个带电机的 Sanic 工人
- angular - 在 Angular 6 中定义全局变量的全局服务
- mobile - Instagram 和 Twitter 如何在应用上同步内容
- java - Springboot java application-mysql CommunicationsException:通信链路故障
- angular - 角材料垫按钮不会更新外部指令的禁用更改