首页 > 解决方案 > Celery 在使用 app.control.purge() 时运行任务会发生什么?

问题描述

目前我有一个用 django 运行的芹菜批次,如下所示:

芹菜.py:

from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django

load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
   app.control.purge()
   sender.add_periodic_task(30.0, check_loop.s())
   recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
   print("setup_periodic_tasks")

@app.task()
def check_loop():
    .....
    start = database start number
    end = database end number
    callling apis in a list from id=start to id=end
    create objects
    update database(start number = end, end number = end + 3)

    ....


@app.task()
def recursion_function(default_retry_delay=10):
   .....
   do some looping
   ....
   #when finished, call itself again
   recursion_function.apply_async(countdown=30)

我的目标是每当芹菜文件被编辑然后它会重新启动所有任务 - 删除尚未执行的排队任务(我这样做是因为recursion_function如果它完成检查我的数据库中表的每条记录的工作,它将再次运行它自己所以我不担心它会在中途停下来)。

check_loop函数将调用具有分页以返回对象列表的api,我将通过表中的记录将其与表中的记录进行比较,如果匹配则创建另一个模型的新自定义记录

我的问题是当我清除所有消息时,当前正在运行的任务会中途停止还是继续运行?因为如果check_loop函数中途停止循环遍历 api 列表,那么它将再次运行循环,我将创建我不想要的新重复记录

例子:

check_loop()它的运行任务中途创建对象(在从元素id = 2到id = 5的api列表上),服务器重新启动->再次运行,现在check_loop()从开始运行(在从元素id = 2到id = 5的api列表上)并创建再次从该列表中删除对象(我不想要 100%)

这是它的运行方式吗?我只需要确认

编辑:

https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks

我补充说app.control.purge(),因为当我重新启动时,在之前的执行recursion_function中再次被调用,所以它会自我繁殖setup_periodic_tasksrecursion_functionrecursion_function.apply_async(countdown=30)

标签: djangopython-3.xcelerycelerybeat

解决方案


我不会写像上面 Oleg 的优秀文章那样的文章。答案很简单——所有正在运行的任务将继续运行purge都是关于队列中的任务,等待 Celery 工人挑选。


推荐阅读