首页 > 解决方案 > 芹菜画布行为在异步模式和渴望模式之间有所不同

问题描述

Celery 画布在异步和急切模式下的工作方式存在一些差异。我注意到,在替换自身的动态任务中,一个组后跟一个链不会将结果发送到链上的下一个。

好吧,这似乎很复杂,让我举个例子:

给定以下任务:

@shared_task(bind=True)
def grouped(self, val):
    task = (
        group(asum.s(val, n) for n in range(val)) | asum.s(val)
    )
    raise self.replace(task)

当它像这样分组在另一个画布中时:

@shared_task(bind=True)
def flow(self, val):
    workflow = (asum.s(1, val) |
                asum.s(2) |
                grouped.s() |
                amul.s(3))

    return self.replace(workflow)

当处于渴望模式时,任务amul将不会收到分组的结果。

为了真正说明这个问题,我在 github 上创建了一个示例项目,您可以在其中深入研究问题并通过一些快速解决方案帮助我,可能还有一些关于 celery 项目的 PR。

https://github.com/gutomaia/celery_equation

----已编辑----

在这个项目中,我陈述了两种使用 celery 的不同行为。在异步模式下,thouse 任务按预期工作。

>>> from equation.main import *
>>> from equation.tasks import *
>>> flow.delay(1).get()
78
>>> flow.delay(2).get()
120
>>> flow.delay(100).get()
47895

标签: celerycelery-taskcelery-canvas

解决方案


我在测试用例中遇到了这种情况。对于未来的读者,至少从 celery 4.4.0 开始,以下习语将适用于所有上下文,包括同步、进程内执行:

    return self.replace(...)

使用raise或简单地让函数在之后立即结束Task.replace只会在异步模式下工作。相关代码就在 Task.replace 的末尾

        if self.request.is_eager:
            return sig.apply().get()
        else:
            sig.delay()
            raise Ignore('Replaced by new task')

推荐阅读