首页 > 解决方案 > Celery 接收周期性任务但不执行它们

问题描述

我使用 celery 在我的 Django DRF 应用程序中运行定期任务。不幸的是,注册的任务没有被执行。

项目结构:

project_name
___ cron_tasks
______ __init__.py
______ celery.py

芹菜.py:

app = Celery('cron_tasks', include=['cron_tasks.celery'])
app.conf.broker_url = settings.RABBITMQ_URL
app.autodiscover_tasks()
app.conf.redbeat_redis_url = settings.REDBEAT_REDIS_URL
app.conf.broker_pool_limit = 1
app.conf.broker_heartbeat = None
app.conf.broker_connection_timeout = 30
app.conf.worker_prefetch_multiplier = 1


app.conf.beat_schedule = {
    'first_warning_overdue': {
        'task': 'cron_tasks.celery.test_task',
        'schedule': 60.0, # seconds
        'options': {'queue': 'default', 'expires': 43100.0}
    }
}

@shared_task
def test_task():
    app.send_task('cron_tasks.celery.test_action')

def test_action():
    print('action!') # print is not executed

    # I also tried to change the data, but it never happens too.
    from django.contrib.auth import get_user_model
    u = get_user_model().objects.get(id=1)
    u.first_name = "testttt"
    u.save()

设置.py:

RABBITMQ_URL = os.environ.get('RABBITMQ_URL')
REDBEAT_REDIS_URL = os.environ.get('REDBEAT_REDIS_URL')
CELERY_BROKER_URL = os.environ.get('RABBITMQ_URL')
CELERYD_TASK_SOFT_TIME_LIMIT = 60
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = os.environ.get('REDBEAT_REDIS_URL')
CELERY_IMPORTS = ("cron_tasks.celery", )

from kombu import Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default'),
)
CELERY_CREATE_MISSING_QUEUES = True
redbeat_redis_url = REDBEAT_REDIS_URL

Rabbitmq 运行正常。我可以在 celery worker 终端输出中看到它:

- ** ---------- .> transport:   amqp://admin:**@localhost:5672/my_vhost

Redis 运行良好。我使用redis发送节拍。我跑:

celery beat -S redbeat.RedBeatScheduler -A cron_tasks.celery:app  --loglevel=debug

表明:

[2019-02-15 09:32:44,477: DEBUG/MainProcess] beat: Waking up in 10.00 seconds.
[2019-02-15 09:32:54,480: DEBUG/MainProcess] beat: Extending lock...
[2019-02-15 09:32:54,481: DEBUG/MainProcess] Selecting tasks
[2019-02-15 09:32:54,482: INFO/MainProcess] Loading 1 tasks
[2019-02-15 09:32:54,483: INFO/MainProcess] Scheduler: Sending due task first_warning_overdue (cron_tasks.celery.test_task)
[2019-02-15 09:32:54,484: DEBUG/MainProcess] cron_tasks.celery.test_task sent. id->f89083aa-11dc-41fc-9ebe-541840951f8f

芹菜工人以这种方式运行:

celery worker -Q default -A cron_tasks.celery:app -n .%%h --without-gossip --without-mingle --without-heartbeat --loglevel=info  --max-memory-per-child=512000

它说:

-------------- celery@.%me.local v4.2.1 (windowlicker)
---- **** ----- 
--- * ***  * -- Darwin-16.7.0-x86_64-i386-64bit 2019-02-15 09:31:50
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         cron_tasks:0x10e2a5ac8
- ** ---------- .> transport:   amqp://admin:**@localhost:5672/my_vhost
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> default          exchange=default(direct) key=default


[tasks]
  . cron_tasks.celery.test_task

[2019-02-15 09:31:50,833: INFO/MainProcess] Connected to amqp://admin:**@127.0.0.1:5672/my_vhost
[2019-02-15 09:31:50,867: INFO/MainProcess] celery@.%me.local ready.
[2019-02-15 09:41:46,218: INFO/MainProcess] Received task: cron_tasks.celery.test_task[3c121f04-af3b-4cbe-826b-a32da6cc156e]   expires:[2019-02-15 21:40:05.779231+00:00]
[2019-02-15 09:41:46,220: INFO/ForkPoolWorker-2] Task cron_tasks.celery.test_task[3c121f04-af3b-4cbe-826b-a32da6cc156e] succeeded in 0.001324941000000024s: None

预期行为:这应该运行我的test_action(). 但是,即使 celery worker 输出succeeded in 0.001324941000000024s显示 ,该函数也永远不会执行。

标签: pythondjangoredisrabbitmqcelery

解决方案


推荐阅读