python - 更快地使用 Kafka-topic 中的所有消息
问题描述
我们的团队正在将 Kafka 与 Flask 应用程序集成以实时显示数据,但我们也希望显示来自 Kafka 的历史数据。
因此,我们的想法是,我们使用来自特定主题的所有消息并将数据显示给我们的用户。但是,当我们设置 Avro Consumer 来轮询整个主题的消息时,我们每分钟只能消费 100k-200k 条消息,这太慢了,因为每个主题大约有 250 万条消息。即使我们使用相同的 group-id 设置多个消费者,我们仍然没有太大的性能改进。
关于如何以更快的方式从 kafka 主题获取所有消息的任何提示?还是将数据保存到数据库然后从那里查询数据会更好?
我们的消费者:
c = Consumer({
'bootstrap.servers': 'brokers:9092',
'group.id': 'consume_all_topics',
'auto.offset.reset': 'earliest'
})
c.subscribe(['mytopic'])
now = datetime.now()
msg = c.poll(5.0)
while msg.value()['timestamp'] < now:
msg = c.poll(5.0)
解决方案
“即使我们使用相同的 group-id 设置多个消费者,我们仍然没有太大的性能提升。
有关如何以更快的方式从 kafka 主题获取所有消息的任何提示?”
Kafka 的使用量随主题中的分区数量而变化。请记住,一个分区只能由一个消费者组中的一个消费者使用。如果分区数量与消费者组中的消费者数量相匹配,您将获得最佳消费者性能。
此外,如果您对数据使用压缩(例如zstd
,在 2.2.x 版本中可用),您的消耗可能会增加。请注意,理想情况下,压缩应该在生产者端处理。
推荐阅读
- vim - 当我将此键映射到另一个键时,如何让vim打印我直接按下的键?
- php - 在特定查询上避免 laravel 全局范围
- sql-server - 如果它们的键在一组中,则更新行
- java - 如何使用 -Dloader.main 将应用程序参数传递给 Spring Boot 应用程序
- flutter - 如何在特定点停止动画?
- .net-core - CascadingParameter 和 ChildContent 的不同行为在 Blazor 中创建缩进的子组件
- excel - 将文件从excel列表复制到其他文件夹
- java - 在下载签名小程序之前执行哪些控制?
- excel - Excel 宏另存为 PDF
- javascript - 为什么我们不能在 Electron 中打开链接?