django - Django celery 注册周期性任务
问题描述
我将Celery 4.4与Django 2.2一起使用
我必须创建一个Periodic Task,我将PeriodicTask扩展为
from celery.schedules import crontab
from celery.task import PeriodicTask
class IncompleteOrderHandler(PeriodicTask):
run_every = crontab(
minute='*/{}'.format(getattr(settings, 'INCOMPLETE_ORDER_HANDLER_PULSE', 5))
)
def run(self, *args, **kwargs):
# Task definition
eligible_users, slot_begin, slot_end = self.get_users_in_last_slot()
map(lambda user: self.process_user(user, slot_begin, slot_end), eligible_users)
之前注册上述任务,我曾经调用
from celery.registry import tasks
tasks.register(IncompleteOrderHandler)
但是registry
现在celery
. 如何注册上述周期性任务?
解决方案
我对基于类的芹菜任务有同样的问题。这必须有效,但它没有!意外地,我的问题被这两个变化的每个人解决了:
我在viewsets.py中的tasks.py中导入了一个基于类的任务,突然我发现这样做之后,celery在tasks.py中找到了所有任务。
这是我的基本芹菜设置文件:
from __future__ import absolute_import import os from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'picha.settings') app = Celery('picha') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
我将最后一行更改为
app.autodiscover_tasks(lambda: settings.CELERY_TASKS)
并将 CELERY_TASKS 列表添加到 settings.py 并在其中写入所有 tasks.py 文件路径,然后 celery 找到了任务。
我希望其中之一对你有用。
推荐阅读
- sql - 从 Oracle 中的日期获取季度的问题
- google-cloud-platform - GKE 堆栈驱动程序跟踪报告按集群按环境按服务按服务版本
- java - 当用户输入不正确时,我将如何做到这一点,以便 while/else 语句正确地重复自身?
- javascript - 如何在 Electron 中正确加载 lit-html 模块
- text-to-speech - Google Cloud 文字转语音单词时间戳
- python - 什么是实时更新 SQL Server 的好方法?
- android - 删除代码后,crashlytics 继续注册异常
- node.js - 如何为 nodejs Express 服务器启用 CORS?
- amazon-web-services - 如何计算 AWS 中子网的总 IP
- javascript - 如何使用 JS async/await 等待 AJAX 响应