python - 架构建议 - 如何实现自动缩放的异步任务
问题描述
我们有一个大型应用程序,它使用 django 作为 ORM,使用 celery 作为任务运行基础设施。我们运行由事件(用户驱动或自动)触发的复杂管道,如下所示:
def pipeline_a:
# all lines are synchronous, so second line must happen after first is finished successfully
first_res = a1()
all_results = in_parallel.do(a2, a3, a4)
a5(first_res, all_results)
我们希望在不同的机器上运行 a1, a2, ...(每个任务可能需要不同的资源),并且并行运行的管道数量总是在变化。今天我们使用 celery,它非常方便地实现上述功能 - 但不适合自动缩放(我们将它破解为与 kubernetes 一起使用,但它没有原生支持)。
我主要想解决的问题是:
- 只有在所有先前的步骤都完成后才能“运行下一个管道步骤”(我可能事先不知道将运行哪些步骤 - 这取决于先前步骤的结果,因此这些步骤本质上是动态的)
- 今天我们尝试使用 kubernetes (EKS) 来自动扩展一些任务(SQS 队列大小是 hpa 指标)。如何让 kubernetes 不尝试终止当前正在运行的任务,但如果有新任务到达队列,仍然“启动 pod”(许多任务需要大约半小时才能完成)
到目前为止,我的经验是解决 1,celery 是最方便的方法,但它与 2 发生冲突。那么如果没有 celery,你将如何解决 1,那么我如何利用 kubernetes 来处理长时间运行的任务?
解决方案
如果我正确理解你的问题,
- 您有最多可以运行 30 分钟的异步作业。
- 作业在 K8s 上运行。
- 当前作业的输出可能决定下一个作业。
- 您有能力使用 SQS。
您可以为每个任务维护队列。为每个队列实现一个消费者。使用 Django 首先将任务添加到“a1”。更新 db 中的作业状态。
当 a1 的消费者完成执行时,它会更新 db 中的状态并推送到正确的队列。让我们说'a3'。'a3' 的消费者将读取任务。更新数据库。执行。将任务推送到正确的队列中。更新数据库。
如果使用 SQS,则将无限任务存储在队列中。您将不得不根据 SQS 队列的大小增加使用者的数量。为此,您可以使用https://github.com/Wattpad/kube-sqs-autoscaler
推荐阅读
- flutter - 如何使用 MediaQuery 构建主题?
- python - 如果 col 的行中的 str1==str 在另一个 col 的行中,则设置第三个 col 的 str1=str2
- git - Git合并问题不相关的分支历史
- kotlin - 在 Kotlin 中替换 String.format(...Locale)
- dart - Dart:如何将元素添加到从父类继承的 Map
- xero-api - XERO api 登录后删除租户连接
- python - 为什么我从 powershel 安装软件包时出错
- python - Pygame 窗口立即打开和关闭
- swift - 删除 Swift RxAlamofire 依赖项
- symfony-cmf - SymfonyCMF 中出现错误“没有一个链接的路由器能够生成路由:找不到路由”的原因是什么?