首页 > 解决方案 > Django celery 注册周期性任务

问题描述

我将Celery 4.4Django 2.2一起使用

我必须创建一个Periodic Task,我将PeriodicTask扩展为


from celery.schedules import crontab
from celery.task import PeriodicTask

class IncompleteOrderHandler(PeriodicTask):
    run_every = crontab(
        minute='*/{}'.format(getattr(settings, 'INCOMPLETE_ORDER_HANDLER_PULSE', 5))
    )

    def run(self, *args, **kwargs):
        # Task definition
        eligible_users, slot_begin, slot_end = self.get_users_in_last_slot()
        map(lambda user: self.process_user(user, slot_begin, slot_end), eligible_users)

之前注册上述任务,我曾经调用

from celery.registry import tasks

tasks.register(IncompleteOrderHandler)

但是registry现在celery. 如何注册上述周期性任务?

标签: djangocelerydjango-celerydjango-celery-beat

解决方案


我对基于类的芹菜任务有同样的问题。这必须有效,但它没有!意外地,我的问题被这两个变化的每个人解决了:

  1. 我在viewsets.py中的tasks.py中导入了一个基于类的任务,突然我发现这样做之后,celery在tasks.py中找到了所有任务。

  2. 这是我的基本芹菜设置文件:

    from __future__ import absolute_import
    import os
    from celery import Celery
    from django.conf import settings
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'picha.settings')
    app = Celery('picha')
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    

    我将最后一行更改为app.autodiscover_tasks(lambda: settings.CELERY_TASKS)并将 CELERY_TASKS 列表添加到 settings.py 并在其中写入所有 tasks.py 文件路径,然后 celery 找到了任务。

我希望其中之一对你有用。


推荐阅读