首页 > 解决方案 > 在特定的 Celery 任务之前和之后运行一个函数?

问题描述

我希望在 celery 任务执行之前运行一个函数,以及在任务完成后运行一个函数。但是,我只想为特定任务执行此操作。因此,我认为celery 信号task_prerun 不适合我吗?

在特定Celery 任务之前和之后运行函数的正确方法是什么?

标签: pythoncelery

解决方案


我认为你可以简单地添加一个单独的参数perform_func,如果它是真的,你执行这些功能。如果您不能更改函数签名,则创建一个全局状态并将函数存储在其中。

例子:

def execute_before():
    print("before task")

def execute_after():
    print("after task")

以下是以下方法。

  1. 作为函数参数
@task(name="your_task")
def your_task(perform_func=False, *args, **kwargs):
    if perform_func:
        execute_before()
    # Your task logic
    ...
    if perform_func:
        execute_after()
  1. 作为一个全球国家
STATE = [your_task]

@task(name="your_task")
def your_task(*args, **kwargs):
    if your_task in STATE:
        execute_before()
    # Your task logic
    ...
    if your_task in STATE:
        execute_after()

编辑

回想起来,我觉得还有另一种方式。您可以创建一个全局任务处理程序并通过它传递所有请求。

@task(name="global_handler")
def global_handler(func: Callable, perform_func=False, *args, **kwargs):
    if perform_func:
        execute_before()
    func() # call as a normal function
    if perform_func:
        execute_after()

def your_task(*args, **kwargs):
    # Your task logic
    ...

if __name__ == "__main__":
    # task caller function
    global_handler.delay(your_task, True)

这个技巧是有代价的。如果您说将任务状态存储在某个数据库中或将其作为日志,那么您的所有条目基本上都将具有相同的任务名称,这会使分离任务变得困难。您显然可以通过传递的参数来隔离它,但是对此进行查询会困难得多。从好的方面来说,这种方法绝对遵循 DRY 原则。


推荐阅读