parallel-processing - 具有动态数量的并行消费者的 Kafka 工作队列
问题描述
我想用Kafka来“分工”。我想将工作实例发布到一个主题,并运行一组相同的消费者来处理它们。随着每个消费者完成其工作,它将从主题中提取下一个工作。每件作品只能由一个消费者处理一次。处理工作很昂贵,所以我需要很多消费者在很多机器上运行才能跟上。我希望消费者的数量根据需要增加和减少(我计划为此使用 Kubernetes)。
我发现了一种为每个消费者创建一个唯一分区的模式。这“分工”,但分区的数量是在创建主题时设置的。此外,必须在命令行上创建主题,例如
bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 3 --topic divide-topic --create --replication-factor 1
...
for n in range(0,3):
consumer = KafkaConsumer(
bootstrap_servers=['localhost:9092'])
partition = TopicPartition('divide-topic',n)
consumer.assign([partition])
...
我可以为每个消费者创建一个独特的主题,并编写我自己的代码来为这些主题分配工作。这看起来很恶心,我仍然必须通过命令行创建主题。
具有动态数量的并行消费者的工作队列是一种常见的架构。我不能成为第一个需要这个的人。使用 Kafka 的正确方法是什么?
解决方案
您找到的模式是准确的。请注意,主题也可以使用Kafka Admin API创建,并且在创建主题后也可以添加分区(带有一些陷阱)。
在 Kafka 中,划分工作和允许扩展的方法是使用分区。这是因为在一个消费者组中,每个分区在任何时候都被一个消费者消费。
例如,您可以有一个具有 50 个分区的主题和一个订阅该主题的消费者组:
当吞吐量较低时,组中只能有几个消费者,他们应该能够处理流量。
当吞吐量增加时,您可以添加消费者,最多可达分区数(本例中为 50 个),以承担一些工作。
在这种情况下,50 个消费者是扩展方面的限制。消费者公开了许多指标(例如延迟),让您可以随时决定是否有足够的指标
推荐阅读
- python - 有没有办法将 tf.train.Checkpoint 与 MonitoredTrainingSession 一起使用?
- php - 使用 AJAX 以多步形式解析值的问题
- git - 如何使用 GIT cmd 将远程分支代码的副本复制到本地分支
- angular - Angular 5 - 在所有请求完成后,微调器 HttpInterceptor 有时需要很长时间才能消失
- azure - 如何使用 Azure API v2 创建虚拟机
- javascript - React - 使用引导程序创建手风琴不会在点击时展开/关闭
- c# - ASP.NET Webform如何使用Ninject.Web.Common.WebHost Prop注入
- xml - 运行 XSLT 时 REMOTE_ADDR 消失
- django - 在单个 html 页面中批量下载在多个类别下呈现的图像
- extjs - checkcolumn 网格单元格边界根据 sencha 6.0.2 中的禁用启用状态进行条件更改