首页 > 解决方案 > celery 不能自定义属性

问题描述

win10 使用 celery 报错。

我在windows 10中使用的是Celery 5.1.0,worker可以用evelent启动。并且在使用 api 进行测试时遇到了一些错误。首先是 on_failure 不起作用。我认为可能是平台问题,我将在 Linux(我的 ubuntu 开发平台)中测试相同的代码。但后来我收到以下错误。我想避免在任务开始/结束时创建会话并销毁它,因此在任务初始化时使用 SQLAlchemy。但是当我大约三年前使用 Celery 3.1 时,代码运行良好。但是相同的代码不能在 Celery 5.1 中工作。

自定义基础任务:

class BaseTask(Task):
   _db = None

   def after_return(self, status, retval, task_id, args, kwargs, einfo):
       _args = dict(status=status, retval=retval, task_id=task_id)
       logger.info(f"Entering into after_return callback  args: {_args}")
       if self._db is not None:
           logger.info("Entering into after_return session remove")
           self._db.close()

       if einfo:
           logger.exception(einfo)

   def on_success(self, retval, *args, **kwargs):
       logger.info(f"Entering into callback on_success: {retval}")
       super().on_success(retval, *args, **kwargs)

   def on_failure(self, exc, task_id, *args, **kwargs):
       logger.info(f"Entering into callback on_failure..., and retry. task_id: {task_id}")
       logger.exception(exc)
       self.retry(countdown=10, exc=exc, max_retries=3)

   @property
   def session(self):
       if self._db is None:
           self._db = Database().connect()
       return self._db

用法:

@client.task(bind=True, Base=BaseTask, name=athena:credit_report)
def generate_credit_report_task(self, name, industry_json):
   task_id = self.request.id
   obj = self.session.query(CeleryTask).filter(CeleryTask.task_id == task_id).first()

结果:

Traceback (most recent call last):
  File c:\users\wbdu\ws\athena\venv\lib\site-packages\celery\app\trace.py, line 412, in trace_task
    R = retval = fun(*args, **kwargs)
  File c:\users\wbdu\ws\athena\venv\lib\site-packages\celery\app\trace.py, line 704, in __protected_call__
    return self.run(*args, **kwargs)
  File C:\Users\wbdu\ws\athena\athena\tasks\company_task.py, line 38, in generate_credit_report_task
    print(generate_credit_report_task.db)
  File c:\users\wbdu\ws\athena\venv\lib\site-packages\celery\local.py, line 146, in __getattr__
    return getattr(self._get_current_object(), name)
AttributeError: 'generate_credit_report_task' object has no attribute 'session'

谁能帮忙解决问题,感激不尽。

标签: pythoncelery

解决方案


推荐阅读