首页 > 解决方案 > 芹菜不并行执行块组

问题描述

我有块列表,它们是文件名

chunks_list = [["file_1", "file_2", "file_3"], ["file_4", "file_5", "file_6"], ...]

我有一个处理这些块的任务:

@celery_app.task
def process_files_task(files: List[str]):
    for file in files:
        logger.info(file)
        # processing each file

我需要并行运行块,并且chunks_list有 8 个块。所以 celery worker 并发也是 8。我运行我的任务是这样的:

chunks_group = process_files_task.chunks([(chunk,) for chunk in chunks_list], 8).group()
chunks_group.delay().get()

问题是所有任务仅由一名工人执行。这就是我在日志中看到的:

[INFO/ForkPoolWorker-4] file_1
[INFO/ForkPoolWorker-4] file_2
[INFO/ForkPoolWorker-4] file_3
[INFO/ForkPoolWorker-4] file_4
...

请告诉我我做错了什么让每个免费的分叉者都执行任务

标签: pythonpython-3.xcelery

解决方案


您可以设置worker_prefetch_multiplier为 1。

一次预取多少消息乘以并发进程数。默认值为 4(每个进程 4 条消息)。然而,默认设置通常是一个不错的选择——如果您有很长时间运行的任务在队列中等待并且您必须启动工作程序,请注意,第一个启动的工作程序将收到最初四倍的消息数量。因此,任务可能不会公平地分配给工人。

更多细节在这里


推荐阅读