首页 > 解决方案 > 架构建议 - 如何实现自动缩放的异步任务

问题描述

我们有一个大型应用程序,它使用 django 作为 ORM,使用 celery 作为任务运行基础设施。我们运行由事件(用户驱动或自动)触发的复杂管道,如下所示:


def pipeline_a:
# all lines are synchronous, so second line must happen after first is finished successfully
first_res = a1()
all_results = in_parallel.do(a2, a3, a4)
a5(first_res, all_results)

我们希望在不同的机器上运行 a1, a2, ...(每个任务可能需要不同的资源),并且并行运行的管道数量总是在变化。今天我们使用 celery,它非常方便地实现上述功能 - 但不适合自动缩放(我们将它破解为与 kubernetes 一起使用,但它没有原生支持)。

我主要想解决的问题是:

  1. 只有在所有先前的步骤都完成后才能“运行下一个管道步骤”(我可能事先不知道将运行哪些步骤 - 这取决于先前步骤的结果,因此这些步骤本质上是动态的)
  2. 今天我们尝试使用 kubernetes (EKS) 来自动扩展一些任务(SQS 队列大小是 hpa 指标)。如何让 kubernetes 不尝试终止当前正在运行的任务,但如果有新任务到达队列,仍然“启动 pod”(许多任务需要大约半小时才能完成)

到目前为止,我的经验是解决 1,celery 是最方便的方法,但它与 2 发生冲突。那么如果没有 celery,你将如何解决 1,那么我如何利用 kubernetes 来处理长时间运行的任务?

标签: pythondjangoarchitectureautoscaling

解决方案


如果我正确理解你的问题,

  • 您有最多可以运行 30 分钟的异步作业。
  • 作业在 K8s 上运行。
  • 当前作业的输出可能决定下一个作业。
  • 您有能力使用 SQS。

您可以为每个任务维护队列。为每个队列实现一个消费者。使用 Django 首先将任务添加到“a1”。更新 db 中的作业状态。

当 a1 的消费者完成执行时,它会更新 db 中的状态并推送到正确的队列。让我们说'a3'。'a3' 的消费者将读取任务。更新数据库。执行。将任务推送到正确的队列中。更新数据库。

如果使用 SQS,则将无限任务存储在队列中。您将不得不根据 SQS 队列的大小增加使用者的数量。为此,您可以使用https://github.com/Wattpad/kube-sqs-autoscaler


推荐阅读