首页 > 解决方案 > 如何动态设置芹菜任务的任务路线?

问题描述

我有一个 API 端点,它从用户那里获取一些数据并用 1 个或多个 celery 工人处理它。

例如,有 3 个不同的任务:

@celery.task
def a(data):
    res = do_a_work(data)
    if res.status:
         b.apply_async(res)
    else:
         return {'message': 'a task failed'}


@celery.task
def b(data):
    res = do_b_work(data)
    if res.status:
        c.apply_async(res)
    else:
         return {'message': 'b task failed'}


@celery.task
def c(data):
    res = do_c_work(data)
    return {'result': res}

aAPI端点b根据数据提交任务。

可能的流程:

我计划使用 celery 链,我可以定义应该在该数据上执行哪些任务。a但问题是,如果任务或任务b状态为 False ,我没有找到停止处理的方法。

另一种方法是从任务示例中的任务内部提交额外的任务,但问题是用户需要获取一个 task_id,稍后他可以使用它来获取结果。

我可以使用第三种方法来解决这个问题吗?

标签: pythonpython-3.xrediscelerycelery-task

解决方案


Achord是执行此操作的正确方法,您只需要重写do_a_workdo_b_work引发异常而不返回True/False值。重写这些函数以引发异常后,您可以使用和弦编写流程并设置错误。

打电话给a你可能会做一些类似beloe的事情

chord(chord(a.s(data), b.s()).on_error(handle_a_error.s()), c.s()).on_error(handle_b_error.s())

推荐阅读