首页 > 解决方案 > 如何确保 Celery 任务在 Pytest 中排队?

问题描述

我有一个 API 端点来注册新用户。“欢迎电子邮件”将排队并异步执行此任务。我有 2 个单元测试要检查:

  1. Api 确实将用户信息保存到 DB OK
  2. Celery 任务确实发送带有正确内容+模板的电子邮件

我想添加第三个单元测试以确保“将用户表单保存到数据库后,端点必须排队发送电子邮件”

我尝试使用 celery.AsyncResult 但它要求我运行工人。更进一步,即使工人准备好了,我们仍然无法验证任务是否入队,因为模棱两可的 PENDING 状态:

  1. 任务存在于队列中但尚未执行:PENDING
  2. 队列中不存在任务:PENDING

有人遇到这个问题吗?我该如何解决?

标签: pythonunit-testingcelerypytest

解决方案


在测试环境中解决这个问题的常用方法是使用task_always_eager配置设置,它基本上指示 Celery 像常规函数一样运行任务。代替 AsyncResult,Celery 将创建一个行为相同但执行逻辑完全不同的 EagerResult 类型的对象。


推荐阅读