首页 > 技术文章 > Celery模块使用

wsongl 2021-01-10 18:32 原文

一、celery概述

1.1、celery定义

celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。

 

1.2、各模块版本

模块 版本
python 3.6.12
django 2.2.17
celery 5.0.5
django-celery-beat 2.1.0
django-celery-results 2.0.0

 

1.3、参考

https://www.cnblogs.com/wdliu/p/9530219.html

https://www.cnblogs.com/f-g-f/p/11300656.html

 

二、纯代码celery

2.1、目录结构

celery01
├── dy.py
└── proj01
        ├── celeryconfig.py
        ├── celery.py
        ├── __init__.py
        └── tasks.py

 

2.2、代码

# celeryconfig.py
# celery配置,队列和结果存放地址,序列化格式
BROKER_URL = 'redis://8.136.184.235:6479'
CELERY_RESULT_BACKEND = 'redis://8.136.184.235:6479/0'
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

  

# celery.py
# 实例化celery,生成实例对象
from __future__ import absolute_import
from celery import Celery

app = Celery('proj01', include=["proj01.tasks"])
app.config_from_object("proj01.celeryconfig")

if __name__ == "__main__":
    app.start()

  

# tasks.py
# celery的具体功能代码,此处为add函数
from __future__ import absolute_import
from time import sleep
from proj01.celery import app


@app.task
def add(x, y):
    sleep(1)
    return x + y

  

# __init__.py
# 空文件,无内容

  

# dy.py
from proj01.tasks import add
import time

t1 = time.time()

r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)

r_list = [r1, r2, r3, r4, r5]
for r in r_list:
    while not r.ready():
        pass
    print('task:', r)
    print(r.result)

t2 = time.time()

print('共耗时:%s' % str(t2-t1))

  

2.3、运行代码

启动celery服务:

在项目目录下,此项目目录是/root/celery01,执行如下命令:

celery -A proj01.celery worker -l info

 

调用服务:

在项目目录下,此项目目录是/root/celery01,执行如下命令:
python dy.py

 

2.4、结果

dy.py运行结果,如下图所示:

 

celery服务运行结果,如下图所示:

 

 

三、纯代码celery定时任务

3.1、概述

实现任务功能的定时执行,不用人工手动触发,但局限是不能动态自定义定时任务,此处在代码里写死。

 

3.2、代码变动

tip:在上一步(第二章 纯代码 Celery)的代码基础上改动,红色为变动内容

# celery.py
# 添加定时任务的功能,硬编码添加定时任务
from __future__ import absolute_import
from celery import Celery
from datetime import timedelta
from celery.schedules import crontab

app = Celery('proj01', include=["proj01.tasks"])
app.config_from_object("proj01.celeryconfig")

## 定时任务方式一
app.conf.CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'proj01.tasks.add',
        'schedule': timedelta(seconds=5),
        'args': (16, 16)
    }
}
# crontab配置方法:https://www.cnblogs.com/alex3714/p/6351797.html
## 定时任务方式二,此处先注释,方便代码后面可以直接跑起来
#app.conf.beat_schedule = {
#    # Executes every Monday morning at 7:30 a.m.
#    'add': {
#        'task': 'proj01.tasks.add',
#        'schedule': crontab(hour=7, minute=30, day_of_week=1),
#        'args': (16, 16),
#    },
#}

if __name__ == "__main__":
    app.start()

  

3.3、运行代码:

启动celery服务:

在项目目录下,此项目目录是/root/celery01,执行如下命令:

celery -A proj01.celery worker -l info --beat

 

3.4、结果

会按照定时设置,自动执行任务。

 

 

四、django Celery

4.1、目录结构

红色为新增或改动的文件

proj02
├── app02
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── models.py
│   ├── tasks.py                   # 应用下,会被异步调用的函数
│   ├── tests.py
│   └── views.py
├── manage.py
└── proj02
        ├── celery.py               # 实例化celery,生成celery实例对象
        ├── __init__.py
        ├── settings.py            # celery配置
        ├── urls.py
        └── wsgi.py

 

4.2、代码

# setting.py
# 基础配置(apps、db)这里就不说了
... # 省略其他配置内容
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60

BROKER_URL = 'redis://127.0.0.1:6479'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6479/0'
CELERY_TASK_SERIALIZER = 'msgpack'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

  

# celery.py
# 实例化celery对象,生成celery实例对象
from __future__ import absolute_import, unicode_literals
import os
from django.conf import settings
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj02.settings')
app = Celery('proj02')
# 此处不能用namespace,否则会出错
# app.config_from_object('django.conf:settings', namespace='CELERY')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

  

# tasks.py
# 定义异步任务函数
from celery import shared_task

@shared_task
def add(x, y):
    print(x+y)
    return x + y

  

# app02/views.py
# 定义视图函数,调用tasks.py里的异步函数
from django.http import JsonResponse
from app02 import tasks

def index(request, *args, **kwargs):
    res=tasks.add.delay(1,3)
    return JsonResponse({'status':'successful','task_id':res.task_id})


# proj02/urls.py
from django.contrib import admin
from django.urls import path
from app02.views import index

urlpatterns = [
    path('admin/', admin.site.urls),
    path('', index),
]

  

4.3、运行代码

启动django服务:

在项目目录下,此项目目录是/root/proj02,执行如下命令:

python manage.py runserver 0.0.0.0:8888

 

启动celery服务:

在项目目录下,此项目目录是/root/proj02,执行如下命令:

celery -A proj02.celery worker -l info

 

调用接口:

http://<django服务器地址ip>:8888

 

4.4、结果

 

{"status": "successful", "task_id": "fa1dc915-b1c2-4250-8557-2172d00febf1"}

 

 

扩展:task结果存储

CELERY_RESULT_BACKEND

1、概述

 除了redis、rabbitmq能做结果存储外,还可以使用Django的orm作为结果存储,当然需要安装依赖插件,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作。

 

2、配置

安装依赖:

pip install django-celery-results

 

配置settings.py,注册app:

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

  

修改backend配置,将redis改为django-db:

#CELERY_RESULT_BACKEND = 'redis://127.0.0.0:6479/0'
CELERY_RESULT_BACKEND = 'django-db'

  

同步数据库:

python manage.py migrate django_celery_results

  

3、使用

查看数据库表(django_celery_results),每个任务执行后的结果,存储在这里。

 

 

五、django Celery定时任务

5.1、 概述

实现任务功能的定时执行,不用人工手动触发,但局限是不能动态自定义定时任务,此处在代码里写死。

 

5.2、代码变动

tip:在上一步(第四章 django Celery)的代码基础上改动,红色为变动内容

# proj02/celery_schedule.py
# 定时任务执行的功能函数
from datetime import timedelta
from celery.schedules import crontab

## 定时任务方式一
CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'app02.tasks.add',
        'schedule': timedelta(seconds=10),
        'args': (16, 16)
    }
}

## 定时任务方式二,此处先注释,方便代码后面可以直接跑起来
#CELERYBEAT_SCHEDULE = {
#    'add': {
#        'task': 'app02.tasks.add',
#        'schedule': crontab(hour=7, minute=30, day_of_week=1),
#        'args': (16, 16)
#    }
#}

  

# proj02/settings.py
# 新增对celery_schedule.py文件的引用
from proj02.celery_schedule import CELERYBEAT_SCHEDULE
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app02',
    'django_celery_beat',
    'django_celery_results',   # 这个是本页`扩展`章节的内容
]

  

5.3、运行代码

启动django服务:

在项目目录下,此项目目录是/root/proj02,执行如下命令:

python manage.py runserver 0.0.0.0:8888

 

启动celery服务:

在项目目录下,此项目目录是/root/proj02,执行如下命令:

celery -A proj02.celery worker -l info --beat

 

5.4、结果

会按照定时设置,自动执行任务。

 

 

六、django管理后端自定义定时任务

6.1、启动服务

启动django服务:

在项目目录下,此项目目录是/root/proj02,执行如下命令:

python manage.py runserver 0.0.0.0:8888

 

启动celery服务(特别注意启动时加上参数,红色标记):

在项目目录下,此项目目录是/root/proj02,执行如下命令:

celery -A proj02.celery worker -l info  --beat --scheduler django_celery_beat.schedulers:DatabaseScheduler

 

6.2、界面配置使用

 登录管理后台,看到此界面,如下:

 

 详细配置,不具体介绍了,自己点点试试,很快就能知道。

 

 

七、自定义动态定时任务

7.1、概述

django-celery-beat插件本质上是对数据库表变化检查,一旦有数据库表改变,调度器重新读取任务进行调度,所以如果想自己定制的任务页面,只需要操作beat插件的四张表就可以了。

 

7.2、实例代码

后续补充

 

 

八、celery监控

 https://www.cnblogs.com/zivli/p/11517797.html

后续补充

 

推荐阅读