python - 如何在 python 中扩展 Kafka 消费者?
问题描述
这可能有多个问题,所以请耐心等待。我仍在寻找使用 Kafka 架构的正确方法。我知道一个主题的分区是按消费者划分的。
究竟什么是消费者?现在,我正在考虑编写一个充当消费者的守护程序 python 进程。当消费者消费来自 Kafka 的消息时,有一个任务是我必须完成的。这是一项艰巨的任务,因此我正在创建同时运行的子任务。我可以在同一台机器上拥有多个消费者(python 脚本)吗?
我有多个正在处理的微服务,所以每个微服务都有自己的消费者?
当负载增加时,我必须扩展消费者。我想产生一台新机器,充当另一个消费者。但我只是觉得我在这里做错了,觉得必须有更好的方法。
你能告诉我你是如何根据负载来扩展你的消费者的吗?如果我需要增加我的消费者,我是否必须增加我的主题分区?我如何动态地做到这一点?当产生的消息较少时,我可以减少分区吗?最初有多少个分区是理想的?
并请提出一些可遵循的良好做法。
这是我正在使用的消费者脚本
while True:
message = client.poll(timeout=10)#client is the KafkaConsumer object
if message is not None:
if message.error():
raise KafkaException(message.error())
else:
logger.info('recieved topic {topic} partition {partition} offset {offset} key {key} - {value}'.format(
topic=message.topic(),
partition=message.partition(),
offset=message.offset(),
key=message.key(),
value=message.value()
))
#run task
解决方案
我可以在同一台机器上拥有多个消费者(python 脚本)吗?
是的。不过,您也可以使用 Python 线程。
如果您不使用多个主题,则不需要多个消费者。
究竟什么是消费者?
随意阅读 Apache Kafka 网站...
每个微服务都有自己的消费者?
每个服务是否运行类似的代码?好的。
我想生产一台新机器
在一台机器上生成应用程序的新实例。监控 CPU 和内存以及网络负载。在至少其中一台在正常处理下超过 70% 之前,不要购买新机器。
如果我需要增加我的消费者,我是否必须增加我的主题分区?
一般来说,是的。一个消费者组中的消费者数量受订阅主题的分区数量限制。
当产生的消息较少时,我可以减少分区吗?
不,不能减少分区
当负载增加时,我必须扩展消费者
不必要。增加的负荷是不断上升,还是一波又一波?如果可变,那么您可以让 Kafka 缓冲消息。消费者将尽可能快地进行轮询和处理。
您需要定义您的 SLA,以了解从生产者到达主题后处理消息需要多长时间。
最初有多少个分区是理想的?
关于这方面的文章有很多,具体取决于您自己的硬件和应用程序要求。只需记录每条消息,您就可以拥有数千个分区......
当消费者消费来自Kafka的消息时,有一个任务我必须完成
听起来你可能想看看 Celery,而不仅仅是 Kafka。您还可以查看 Faust进行 Kafka 处理
推荐阅读
- reactjs - 简单的反应片段未显示在侧栏中
- javascript - Pattern for lookup values that support UIs (i.e. Country Codes, State Codes, etc) within React
- python - 使用 Python 的 struct 库打包 None 类型
- javascript - 如何打印开关的 ON/OFF 状态?
- javascript - Material UI Slider 组件,鼠标释放时更新状态,同时实时滑动
- amazon-web-services - 用于 S3 传输的 AWS Lambda 跨账户密钥和角色使用
- c# - 使用字符串的 C# 计算器
- ruby-on-rails - 在 API 错误处理期间输出的 jbuilder 语法不正确
- assembly - 为什么16位指令不能访问通用寄存器的高位寄存器
- android - 子集合请求(Firestore、Coroutine)