首页 > 技术文章 > celery的使用

fivenian 2021-08-25 14:25 原文

celery的异步任务

  1. 安装celery
$ pip install -U celery

1)安装相关依赖

$ pip install "celery[redis,auth,msgpack]"

序列化程序

  • celery[auth]

    用于使用auth安全序列化程序。

  • celery[msgpack]

    用于使用 msgpack 序列化程序。

  • celery[redis]

    使用 Redis 作为消息传输或结果后端。

  1. 安装redis

这里我们使用redis作为celery的broker,作为任务队列的存储和结果的存储。

对于 Redis 支持,您必须安装其他依赖项。您可以使用celery[redis] bundle一次性安装 Celery 和这些依赖项:

$ pip install -U "celery[redis]"

1)配置

配置很简单,只需配置你的 Redis 数据库的位置:

app.conf.broker_url = 'redis://localhost:6379/0'

其中 URL 的格式为:

redis://:password@hostname:port/db_number

方案后面的所有字段都是可选的,并且将默认为localhost 端口 6379,使用数据库 0。

  1. 使用ceelry

1)首先我们可以创建一个celery的文件夹,然后创建一个tasks.py文件

  • celery/tasks.py
from celery import Celery

# 第一个参数就是当前脚本的名称,backend 任务执行结果的存储地址broker 任务队列的存储地址
app = Celery('tasks', backend='redis://127.0.0.1', broker='redis://127.0.0.1')

@app.task
def add(x, y):
    return x + y
  • celery/run_tasks.py
from tasks import add

result = add.delay(1, 2)
print('Is task ready: %s' % result.ready())  # False说明任务还没有执行完
run_result = result.get(timeout=1)
print('task result: %s' % run_result)

print('Is task ready: %s' % result.ready())
  1. 启动celery
$ cd celry
$ celery -A tasks worker --loglevel=info
  1. 使用flower监控celery任务的执行情况
pip install flower

启动flower,指定我们的应用,确保你的celery是启动的。

cd celery
celery -A tasks flower --broker=redis://@localhost:6379/0

运行结果:

celery [celery args] flower [flower args].
[I 210825 10:54:00 command:152] Visit me at http://localhost:5555
[I 210825 10:54:00 command:159] Broker: redis://127.0.0.1:6379//
[I 210825 10:54:00 command:160] Registered tasks: 

我们就可以通过5555端口看到celery异步任务的运行情况了

Django中使用celery

官方地址:https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

  1. 创建celery文件

根据官方文档的说明,我们可以直接在Django项目同名的应用下创建celery.py文件

  • recruitment/recruitment/celery.py
import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SEttINGS_MODULE', 'recruitment.base')  # 这里我把配置文件放到了根目录下的settings/base.py 中

app = Celery('recruitment')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

def debug_task(self):
  print(f'Request: {self.request!r}')

然后我们需要在这个celery.py文件所在的目录的__init__文件中添加:

from __future__ import absolute_import, unicode_literals


# This will make sure the app is always imported when/保证所有app下的任务都能导入进来
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)
  1. 添加celery配置
  • settings/base.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYD_MAX_TASKS_PER_CHILD = 10
CELERYD_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_work.log")
CELERYBEAT_LOG_FILE = os.path.join(BASE_DIR, "logs", "celery_beat.log")
  1. 在别的应用下使用celery执行异步任务 [使用celery异步发送钉钉群消息通知]
  1. 首先我们需要在应用下创建一个tasks.py文件
  • interview/tasks.py
from __future__ import absolute_import, unicode_literals

from celery import shared_task
from .dingtalk import send

@shared_task
def send_dingtalk_message(message):
    send(message)
  • interview/dingtalk.py
from dingtalkchatbot.chatbot import DingtalkChatbot

from django.conf import settings


def send(message, at_mobiles=[]):
    # 引用 settings里面配置的钉钉群消息通知的WebHook地址:
    webhook = settings.DINGTALK_WEB_HOOK

    # 初始化机器人小Y,
    xiaoY = DingtalkChatbot(webhook)

    # 方式二:勾选“加签”选项时使用(v1.5以上新功能)
    # xiaoY = DingtalkChatbot(webhook, secret=secret)

    # Text消息@所有人
    xiaoY.send_text(msg=('消息通知: %s' % message), at_mobiles=at_mobiles)
  • interview.views.py
from interview.tasks import send_dingtalk_message

def notify_interview(modeladmin, request, queryset):
    candidates = ''
    interviewers = ''
    for obj in queryset:
        candidates = obj.userame + '' + candidates
        interviewers = obj.first_interviewer_user + '' + interviewers
    # 这里的消息发送到钉钉, 或者通过 Celery 异步发送到钉钉
    send_dingtalk_message.delay('候选人 %s 进入面试环节, 亲爱的面试官请做好面试准备:%s。' % (candidates, interviewers))
  1. 启动celery服务

启动celery服务,到我们的项目根目录启动,然后执行

$ celery -A recruitment worker -l info

如果需要制定配置文件,如果在mac下可以执行:

$ DJANGO_SEttINGS_MODULE=settings.base celery --app=recruitment worker --loglevel=info

启动flower监控异步任务

$ celery -A recruitment flower --broker=redis://localhost:6379/0

celery定时任务

Django与celery集成 定时任务

使用自定义调度程序类

可以在命令行(--scheduler参数)上指定自定义调度程序类。

默认调度程序是celery.beat.PersistentScheduler,它只是在本地shelve 数据库文件中跟踪上次运行时间。

还有django-celery-beat扩展,它在 Django 数据库中存储计划,并提供一个方便的管理界面来管理运行时的周期性任务。

要安装和使用此扩展:

  1. 使用pip安装包:

    $ pip install django-celery-beat
    
  2. django_celery_beat模块添加到INSTALLED_APPS您的 Django 项目中settings.py

    INSTALLED_APPS = (
        ...,
        'django_celery_beat',
    )
    

    请注意,模块名称中没有破折号,只有下划线。

  3. 应用 Django 数据库迁移,以便创建必要的表:

    $ python manage.py migrate
    
  4. 使用调度程序启动celery beat服务django_celery_beat.schedulers:DatabaseScheduler

    $ celery -A proj beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
    

    注意:您也可以beat_scheduler直接将其添加为设置。

  5. 访问 Django-Admin 界面来设置一些周期性任务。

启动服务

$ DJANGO_SETTINGS_MODULES=settings.base celery -A recruitment worker -l info
# DJANGO_SETTINGS_MODULE=settings.local celery -A recruitment beat
# 启动beat
$ DJANGO_SETTINGS_MODULE=settings.base celery -A recruitment beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
$ DJANGO_SETTINGS_MODULE=settings.base celery -A recruitment flower

管理定时任务的方法

  • 在Admin后台添加管理定时任务

    执行数据迁移后管理后台中就会生成一个Periodic Tasks的定时任务模块

  • 系统启动时自动注册定时任务

    在主应用下创建一个celery.py文件

    recruitment/celery.py

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('recruitment')
    
    @app.on_after_configure.connect  # 在程序启动的时候执行
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    
        # Calls test('world') every 30 seconds
        sender.add_periodic_task(30.0, test.s('world'), expires=10)
    
        # Executes every Monday morning at 7:30 a.m.
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            test.s('Happy Mondays!'),
        )
    
    @app.task
    def test(arg):
        print(arg)
    
  • 直接设置应用的beat_schedule

    recruitment/tasks.py

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    @shared_task
    def add(a, b,):
        return a + b
    

    recruitment/celery.py

    from celery import Celery
    
    app = Celery('recruitment')
    
    # this is important to load the celery tasks
    from recruitment.tasks import add  
    
    app.conf.beat_schedule = {
        'add-every-10-seconds': {
            'task': 'recruitment.tasks.add',  # 这里需要显示的将任务添加到这里
            'schedule': 10.0,
            'args': (16, 4, )
        },
    }
    
  • 运行时添加定时任务

    import json
    from celery import Celery
    from django_celery_beat.models import PeriodicTask, IntervalSchedule
    
    app = Celery('recruitment')
    
    @app.task
    def test(arg):
        print(arg)
    
    # 先创建定时策略
    schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS,)  # 每十秒运行一次
    
    # 再创建任务
    task = PeriodicTask.objects.create(interval=schedule, name='hello world', task='recruitment.celery.test', args=json.dumps(['welcome']),)
    

推荐阅读