python - celery 不能自定义属性
问题描述
win10 使用 celery 报错。
我在windows 10中使用的是Celery 5.1.0,worker可以用evelent启动。并且在使用 api 进行测试时遇到了一些错误。首先是 on_failure 不起作用。我认为可能是平台问题,我将在 Linux(我的 ubuntu 开发平台)中测试相同的代码。但后来我收到以下错误。我想避免在任务开始/结束时创建会话并销毁它,因此在任务初始化时使用 SQLAlchemy。但是当我大约三年前使用 Celery 3.1 时,代码运行良好。但是相同的代码不能在 Celery 5.1 中工作。
自定义基础任务:
class BaseTask(Task):
_db = None
def after_return(self, status, retval, task_id, args, kwargs, einfo):
_args = dict(status=status, retval=retval, task_id=task_id)
logger.info(f"Entering into after_return callback args: {_args}")
if self._db is not None:
logger.info("Entering into after_return session remove")
self._db.close()
if einfo:
logger.exception(einfo)
def on_success(self, retval, *args, **kwargs):
logger.info(f"Entering into callback on_success: {retval}")
super().on_success(retval, *args, **kwargs)
def on_failure(self, exc, task_id, *args, **kwargs):
logger.info(f"Entering into callback on_failure..., and retry. task_id: {task_id}")
logger.exception(exc)
self.retry(countdown=10, exc=exc, max_retries=3)
@property
def session(self):
if self._db is None:
self._db = Database().connect()
return self._db
用法:
@client.task(bind=True, Base=BaseTask, name=athena:credit_report)
def generate_credit_report_task(self, name, industry_json):
task_id = self.request.id
obj = self.session.query(CeleryTask).filter(CeleryTask.task_id == task_id).first()
结果:
Traceback (most recent call last):
File c:\users\wbdu\ws\athena\venv\lib\site-packages\celery\app\trace.py, line 412, in trace_task
R = retval = fun(*args, **kwargs)
File c:\users\wbdu\ws\athena\venv\lib\site-packages\celery\app\trace.py, line 704, in __protected_call__
return self.run(*args, **kwargs)
File C:\Users\wbdu\ws\athena\athena\tasks\company_task.py, line 38, in generate_credit_report_task
print(generate_credit_report_task.db)
File c:\users\wbdu\ws\athena\venv\lib\site-packages\celery\local.py, line 146, in __getattr__
return getattr(self._get_current_object(), name)
AttributeError: 'generate_credit_report_task' object has no attribute 'session'
谁能帮忙解决问题,感激不尽。
解决方案
推荐阅读
- email - 如何为 Mailgun 使用自定义 SMTP URL?
- jenkins - 如何在jenkinsfile中使用不同的环境变量在管道中定期触发构建?
- javascript - 使用循环标记多个元素
- php - 如何在 PDO 中使用 SELECT INTO
- javascript - 如何一次有条件地更新多个元素
- background-process - Python脚本在后台运行不写入文件?
- jquery - 我如何用计算jquery制作订单
- c# - 更新库存 ID 条目(提交)上的 SO 行扩展字段
- simulink - “查找”与 HDL 代码兼容的替代块
- blockchain - 锯齿交易处理器不响应 ping