首页 > 解决方案 > 芹菜正确的任务组链

问题描述

我有一个字符串列表,它们是文件名:

chunks_list = [["file_1", "file_2"], ["file_3", "file_4", "file_5"], ...]

我需要在 celery 任务中处理这些文件,所以我有一个任务:

@celery_app.task
def process_file_task(filename):
    # do some staff with file 
    # (e.g. produce data to Kafka)

我需要并行运行文件的内部列表。并且外部列表应该是顺序的。

处理方式应该是下一个:

worker1: file_1
worker2: file_2

下一组在第一组完成后开始:

worker1: file_3
worker2: file_4
worker3: file_5

我试图以这种方式运行我的任务:

sequence_tasks = []
for chunks in chunks_list:
    sequence_tasks.append(
        group([process_file_task.si(filename) for filename in chunks])
    )

tasks_chain = chain(
    tasks_group for tasks_group in sequence_tasks
)()
tasks_chain.get()

问题是我可能有数千个文件。而且这种运行任务的方式会消耗我所有的空闲内存,即使只有几百个。请告诉我如何以正确的方式设计我的工作流程,以免内存不足。

标签: pythonpython-3.xcelery

解决方案


你没有提到设置细节:

  • 你的工人在哪里跑?码头工人?(k8s/ecs/..) ec2?
  • 什么是工人并发
  • 每个文件的任务(在同一组内)是否必须在单独的工作人员上运行?

你有多少工人总是有一个大写的限制。假设您有 10 个工作人员,每个工作人员具有并发 1 和 1,000 个任务在一个组中。它们将几乎并行运行(因为您没有 1,000 名工人,只有 10 名)但这很好。Celery 将确保第二组的任何任务都不会开始,直到它完成所有前 1000 个任务。如果运行每个任务所需的时间相等,则每个工作人员将处理 100 个任务,然后将继续进行下一个块。如果花费的时间不同,Celery 将通过将下一个任务分配给下一个空闲工作人员来进行优化。

换句话说,您知道设置是什么(假设具有 16GB 和 8 个内核的 EC2 实例)。您还知道(我希望)单个任务可以占用的最大内存大小是多少。如果一个平均任务占用 1GB RAM,您最多可以运行 16 个并行度为 1 的任务或 6 个并行度为 2 的任务等)


推荐阅读