首页 > 解决方案 > Celery 按照它们被调用的顺序执行任务(在运行时)

问题描述

我有一个由链中的子任务组成的任务。如何确保在第一个任务完成之前不会开始第二次调用此任务?

@shared_task
def task(user):
    res = chain(subtask_1.s(), # each subtask takes ~1 hour
            subtask_2.s(),
            subtask_3.s())

    return res.apply_async()

django 视图现在可能会触发调用此任务:

# user A visits page that triggers task
task.delay(userA)
# 10 seconds later, while task() is still executing, user B visits page
task.delay(userB) 

这导致任务相互竞争,而不是按顺序执行。例如,一旦工人完成subtask_1()了第一个任务,它就会开始处理subtask_1()第二个任务,而不是subtask_2()subtask_3()一个任务。

有没有办法优雅地避免这种情况?我想问题是子任务添加到队列中的顺序。

  1. 我已经设置了 worker --concurreny=1,但是这仍然不会改变他从队列中消费的顺序。
  2. 官方文档(任务食谱)似乎提供了一个我不理解并且不幸对我不起作用的解决方案。
  3. 也许在任务链之后包含一个阻塞机制,带有while not res.ready(): sleep(1)一种hack?

标签: pythondjangocelerydjango-celery

解决方案


您可以等待第一个任务完成,然后像这样执行第二个任务。

res = task.delay(userA)
res.get() # will block until finished
task.delay(userB)

但它会阻塞调用线程,直到第一个线程完成。您可以链接任务以避免阻塞,但为此您必须稍微修改任务签名以接受任务结果作为参数。

@shared_task
def task(_, user): signature takes one extra argument
    # skipped

from celery.canvas import chain


chain(task.s(None, userA), task.s(userB))()

推荐阅读