python - 在特定的 Celery 任务之前和之后运行一个函数?
问题描述
我希望在 celery 任务执行之前运行一个函数,以及在任务完成后运行一个函数。但是,我只想为特定任务执行此操作。因此,我认为celery 信号task_prerun 不适合我吗?
在特定Celery 任务之前和之后运行函数的正确方法是什么?
解决方案
我认为你可以简单地添加一个单独的参数perform_func
,如果它是真的,你执行这些功能。如果您不能更改函数签名,则创建一个全局状态并将函数存储在其中。
例子:
def execute_before():
print("before task")
def execute_after():
print("after task")
以下是以下方法。
- 作为函数参数
@task(name="your_task")
def your_task(perform_func=False, *args, **kwargs):
if perform_func:
execute_before()
# Your task logic
...
if perform_func:
execute_after()
- 作为一个全球国家
STATE = [your_task]
@task(name="your_task")
def your_task(*args, **kwargs):
if your_task in STATE:
execute_before()
# Your task logic
...
if your_task in STATE:
execute_after()
编辑
回想起来,我觉得还有另一种方式。您可以创建一个全局任务处理程序并通过它传递所有请求。
@task(name="global_handler")
def global_handler(func: Callable, perform_func=False, *args, **kwargs):
if perform_func:
execute_before()
func() # call as a normal function
if perform_func:
execute_after()
def your_task(*args, **kwargs):
# Your task logic
...
if __name__ == "__main__":
# task caller function
global_handler.delay(your_task, True)
这个技巧是有代价的。如果您说将任务状态存储在某个数据库中或将其作为日志,那么您的所有条目基本上都将具有相同的任务名称,这会使分离任务变得困难。您显然可以通过传递的参数来隔离它,但是对此进行查询会困难得多。从好的方面来说,这种方法绝对遵循 DRY 原则。
推荐阅读
- javascript - 如何在 Three.js 中创建无限地板(天际线)?
- bash - bash - source builtin 和 $HISTCMD 变量
- bash - `echo " $1 % 2" | bc -l` 在终端或 bash 脚本中不起作用,但 `bc` 在 `bc` 中接受这些参数
- rxjs - 如何使用 NGRX 在 Angular 5 中防止内容跳转/滚动
- angular - 如何在模板字符串中转义引号和大括号
- python - 如何在 Auth 对象的 __call__ 方法中签署 requests.Request 的正文?
- scala - Akka HTTP 设置响应标头
- android - 是否可以创建一个接受任何对象列表的动态 RecyclerView 适配器