首页 > 技术文章 > 一、初探Celery

yaya625202 2018-11-30 13:22 原文

编辑本文章

一、安装RabbitMQ:

 安装:yum install rabbitmq-server
    启动:service rabbitmq-server start

    添加web管理:rabbitmq-plugins enable rabbitmq_management

  默认账号是guest
    创建用户:rabbitmqctl add_user test 123.com
    创建虚拟主机:rabbitmqctl add_vhost testhost
    给用户test授权访问testhost主机:rabbitmqctl set_permissions -p testhost test ".*" ".*" ".*"    

二、安装Redis:

 安装并启动redis server
   安装并启动redis server

    sudo service redis-server start
    报错:redis==3.0.1
        File "/data/project/env/lib/python3.5/site-packages/redis/_compat.py", line 123, in iteritems
        return iter(x.items())
        AttributeError: 'float' object has no attribute 'items'
    解决办法:
        用低版本的Redis的pylib即可
        pip install redis==2.10.6

三、测试脚本

 写测试脚本tasks.py

from celery import Celery
app=Celery('tasks',broker='redis://127.0.0.1:6379/0')

@app.task
def add(x,y):
    return x+y

四、启动Celery:

    celery -A tasks worker --loglevel=info

 这个tasks参数是我们刚才创建的tasks模块
    /data/project/mcelery/env/lib/python3.5/site-packages/celery/platforms.py:796: RuntimeWarning: You're running the worker with superuser privileges: this is
    absolutely not recommended!
    Please specify a different user using the --uid option.
    User information: uid=0 euid=0 gid=0 egid=0
    uid=uid, euid=euid, gid=gid, egid=egid,
    -------------- celery@jdweb.cqyy.local v4.2.1 (windowlicker)
    ---- **** -----
    --- * ***  * -- Linux-3.10.0-862.14.4.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2018-11-30 12:54:28
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         tasks:0x7fade4de06d8
    - ** ---------- .> transport:   redis://127.0.0.1:6379/0
    - ** ---------- .> results:     disabled://
    - *** --- * --- .> concurrency: 4 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    [tasks]
      . tasks.add

    [2018-11-30 12:54:28,656: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
    [2018-11-30 12:54:28,678: INFO/MainProcess] mingle: searching for neighbors
    [2018-11-30 12:54:29,720: INFO/MainProcess] mingle: all alone
    [2018-11-30 12:54:29,733: INFO/MainProcess] celery@jdweb.cqyy.local ready.
    说明:
        1、Celery运行于jdweb.cqyy.local上,版本为v4.2.1
        2、平台为centos7.5
        3、app名称为tasks
        4、传输工具默认的redis,数据库为0
        5、结果存储未配置,是disabled
        6、用prefork来执行并发,当前并发为4
        7、任务监控已经关闭
        8、默认队列celery
    参数:
        -A tasks为指定APP的名称,即我们创建app时给Celery传的第一个参数

五、测试:

    1、另启一python shell
    2、从我们测试脚本中导入add方法
    3、执行add的delay方法,并传入add方法的参数
    >>> add.delay(1,2)
    <AsyncResult: ccd6c32e-7cde-493d-ba9e-d1ef3745b04b>
    >>> add.delay(5,6)
    <AsyncResult: 19500780-d493-46d2-9266-60ab8c0e0ce3>
    >>>

 每个任务都会返回一个AsyncResult实例,用于检查任务状态和异常回溯,该功能默认是关闭的

>>> s.ready()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/data/project/mcelery/env/lib/python3.5/site-packages/celery/result.py", line 311, in ready
    return self.state in self.backend.READY_STATES
  File "/data/project/mcelery/env/lib/python3.5/site-packages/celery/result.py", line 471, in state
    return self._get_task_meta()['status']
  File "/data/project/mcelery/env/lib/python3.5/site-packages/celery/result.py", line 410, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "/data/project/mcelery/env/lib/python3.5/site-packages/celery/backends/base.py", line 365, in get_task_meta
    meta = self._get_task_meta_for(task_id)

    查看任务执行情况:
    [2018-11-30 13:17:36,416: INFO/MainProcess] Received task: tasks.add[ccd6c32e-7cde-493d-ba9e-d1ef3745b04b]  
    [2018-11-30 13:17:36,418: INFO/ForkPoolWorker-2] Task tasks.add[ccd6c32e-7cde-493d-ba9e-d1ef3745b04b] succeeded in 0.00017630800721235573s: 3
    [2018-11-30 13:17:48,781: INFO/MainProcess] Received task: tasks.add[19500780-d493-46d2-9266-60ab8c0e0ce3]  
    [2018-11-30 13:17:48,783: INFO/ForkPoolWorker-3] Task tasks.add[19500780-d493-46d2-9266-60ab8c0e0ce3] succeeded in 0.00018357901717536151s: 11
    任务是根据任务ID来区分的,后面表示执行的结果

六、报错处理

报错动作:测试时通过delay()添加任务

错误来源:Redis redis==3.0.1
        File "/data/project/env/lib/python3.5/site-packages/redis/_compat.py", line 123, in iteritems
        return iter(x.items())
        AttributeError: 'float' object has no attribute 'items'
解决办法:
        用低版本的Redis的pylib即可
        pip install redis==2.10.6

推荐阅读