首页 > 解决方案 > Celery shared_task 挂在任务调用上

问题描述

我有一个带有打印语句的简单芹菜任务。问题出在 shared_task 装饰器中,因为如果我将装饰器更改为 app.task 它工作正常。

@shared_task
def generate_pdf():
  print('Just to test if its working')

但是从视图或 django shell 调用这个任务只是卡住了,直到我按下 Ctrl+C。这是芹菜挂起时键盘中断后的回溯。

^CTraceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/kombu/utils/objects.py", line 41, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/kombu/utils/objects.py", line 41, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'data'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/kombu/utils/objects.py", line 41, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/usr/local/lib/python3.8/site-packages/celery/local.py", line 143, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/usr/local/lib/python3.8/site-packages/celery/local.py", line 105, in _get_current_object
    return loc(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python3.8/site-packages/celery/app/__init__.py", line 69, in task_by_cons
    return app.tasks[
  File "/usr/local/lib/python3.8/site-packages/kombu/utils/objects.py", line 43, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 1259, in tasks
    self.finalize(auto=True)
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 511, in finalize
    _announce_app_finalized(self)
  File "/usr/local/lib/python3.8/site-packages/celery/_state.py", line 52, in _announce_app_finalized
    callback(app)
  File "/usr/local/lib/python3.8/site-packages/celery/app/builtins.py", line 161, in add_chain_task
    def chain(*args, **kwargs):
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 425, in _create_task_cls
    ret = self._task_from_fun(fun, **opts)
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 472, in _task_from_fun
    task.bind(self)  # connects task to this app
  File "/usr/local/lib/python3.8/site-packages/celery/app/task.py", line 332, in bind
    setattr(cls, attr_name, conf[config_name])
  File "/usr/local/lib/python3.8/site-packages/celery/utils/collections.py", line 391, in __getitem__
    return getitem(k)
  File "/usr/local/lib/python3.8/site-packages/celery/utils/collections.py", line 249, in __getitem__
    return mapping[_key]
  File "/usr/local/lib/python3.8/collections/__init__.py", line 1006, in __getitem__
    if key in self.data:
  File "/usr/local/lib/python3.8/site-packages/kombu/utils/objects.py", line 43, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 138, in data
    return self.callback()
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 928, in _finalize_pending_conf
    conf = self._conf = self._load_config()
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 962, in _load_config
    self.on_after_configure.send(sender=self, source=self._conf)
  File "/usr/local/lib/python3.8/site-packages/celery/utils/dispatch/signal.py", line 276, in send
    response = receiver(signal=self, sender=sender, **named)
  File "/code/assetmanagementservice/celery.py", line 20, in setup_periodic_tasks
    generate_weekly_report.s(),
  File "/usr/local/lib/python3.8/site-packages/celery/local.py", line 143, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/usr/local/lib/python3.8/site-packages/celery/local.py", line 105, in _get_current_object
    return loc(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python3.8/site-packages/celery/app/__init__.py", line 69, in task_by_cons
    return app.tasks[
  File "/usr/local/lib/python3.8/site-packages/kombu/utils/objects.py", line 43, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 1259, in tasks
    self.finalize(auto=True)
  File "/usr/local/lib/python3.8/site-packages/celery/app/base.py", line 506, in finalize
    with self._finalize_mutex:
KeyboardInterrupt

app.task 和 shared_task 装饰器之间是否有任何区别,为什么 shared_task 在使用相同的配置 app.task 工作时不起作用?

配置: 项目。初始化.py

from .celery import app as celery_app

__all__ = ['celery_app', ]

项目.celery.py

import os

from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

项目设置.py

CELERY_BROKER_URL = f"amqp://{os.environ['RABBITMQ_DEFAULT_USER']}:{os.environ['RABBITMQ_DEFAULT_PASS']}@{os.environ['RABBITMQ_DEFAULT_HOST']}:5672/{os.environ['RABBITMQ_DEFAULT_VHOST']}"

someapp.tasks.py

from celery import shared_task
@shared_task
def generate_pdf():
  print('Just to test if its working')

这就是我的运行方式:

>>> from someapp.tasks import generate_pdf
>>> generate_pdf.delay() 
Just to test if its working
# here it stucks forever (tried with apply_async it stucks too), but notice the print statement worked

但是将装饰器从 shared_task 更改为 app.task 效果很好。

标签: djangodjango-rest-frameworkcelery

解决方案


推荐阅读