python - python中的并行查询,在kubernetes中有一个文件和几个pod
问题描述
我正在开发一个 python 项目,它将托管在谷歌云提供商的 kubernets 上。这个想法是读取数百万行的文件,其中每一行是 API 中查询的输入键
def getEndpoint(line):
payload="{\r\n \"keyQuery\": \""+line+"\"\r\n}"
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload, verify=False)
response = response.text.encode('utf8')
fileOpen = open('file.txt', 'r')
for lines in fileOpen:
getEndpoint(lines)
我想在几个 Kubernetes POD 上运行我的应用程序,因为我想具有可扩展性,即同时运行多个查询。然而,在这个代码结构中,每个 pod 最终都会从头开始迭代文件,读取已经咨询过的行。这不是我想要的。然后就产生了两个想法:
拆分文件,每个拆分将分布在 pod 之间。(例如:对于一个包含 10 个 pod 的 100 行文件,每个 pod 将读取 10 行)
在运行应用程序之前,使用文件的行创建一个消费队列,这样所有的 pod 都会读取队列而不是直接读取文件。
选项 2 在我看来更具可扩展性和更快。但我想就使用文件作为参考进行查询的最佳方式提出建议。例如,我可能想在 24 小时内运行 100 万次查询。
解决方案
我认为您应该使用异步任务工作者,例如Celery或Dramatiq。
概念是将任务(您的查询行)发送到消息代理(Redis 或 RabbitMQ),然后让工作人员使用任务(接收查询行并向 API 发出请求)
然而,The Dramatiq 或 Celery 有一个功能,比如如果任务失败则重试任务。
不确定你是否想要,但你可以使用这个 repo 进行探索, https://github.com/matiaslindgren/celery-kubernetes-example
推荐阅读
- c# - 将属性自动映射到子属性的属性
- ios - 在 Swift 4 中解码泛型类的可编码树
- chef-infra - Chef windows_package 无法进行静默安装;询问用户输入
- angular - 如何在 Angular-4 的 ng2-table 中添加复选框?
- cucumber - 在黄瓜中执行测试用例之前需要将功能文件复制到某个位置
- mysql - 使用 join 获取特定 fk 的记录
- c# - Console.ReadKey 与 Thread.Sleep 意外回显
- java - Java中引用变量(references)的默认构造函数
- math - 将欧拉角转换为四元数时更改 XYZ 顺序
- javascript - JS 如何将序列化对象转换为数组?