首页 > 解决方案 > Flask:根据日期时间更改 Celery add_periodic_task 的频率

问题描述

我有一个 Flask 应用程序,运行 Flask 2.0.1 和 Celery 5.1.2,它运行一个 Celery 工作者,worker.py. 我希望这段worker.py代码足以描述我正在尝试做的事情,但如有必要,我可以添加其他文件:

from celery import Celery

from . import create_worker_app
from src.scraper.areas import scrape_areas
from src.scraper.contests import scrape_contests
from src.scraper.meta import scrape_meta
from src.scraper.questions import scrape_questions
from src.scraper.results import scrape_results

def create_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config["RESULT_BACKEND"],
        broker=app.config["CELERY_BROKER_URL"],
    )
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery


flask_app = create_worker_app()
celery = create_celery(flask_app)


@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls tasks at the desired frequency

    # daily tasks
    sender.add_periodic_task(86400.0, scrape_areas, name="scrape areas every day")
    sender.add_periodic_task(86400.0, scrape_contests, name="scrape contests every day")
    sender.add_periodic_task(86400.0, scrape_meta, name="scrape meta every day")
    sender.add_periodic_task(86400.0, scrape_questions, name="scrape questions every day")
    sender.add_periodic_task(86400.0, scrape_results, name="scrape results every day")

上述行为创建了一个每 24 小时运行每个方法(存在于其他文件中)的任务,这通常是我想要的。不过,我希望能够以scrape_results几种方式覆盖该方法的这种行为:

  1. 默认行为(每 24 小时运行一次)在大多数情况下都很好。
  2. 我想检查当前日期时间是否在另外两个日期时间之间(我可以用 来做到这一点DateTimeRange),所以它会是这样的:
from datetime import datetime
import pytz
from datetimerange import DateTimeRange
now = datetime.now(pytz.timezone('America/Chicago'))
time_range = DateTimeRange(current_app.config["ELECTION_DAY_RESULT_HOURS_START"], current_app.config["ELECTION_DAY_RESULT_HOURS_END"])
now_formatted = now.isoformat()
if now_formatted in time_range:

我不知道把它放在哪里,因为setup_periodic_tasks似乎只运行一次,但我想做的是如果上面的代码是Truescrape_results会更频繁地运行。也许每三分钟一次。

如果可以的话,我也希望能够用一个.env值覆盖它。所以它在那个时间窗口内会有一个默认值,但是那个时间窗口可能会更高或更低。

无论如何,我很难理解这是否可能(尽管它似乎可能是从Celery 4.1.0开始的)。如果是这样,我不确定在哪里放置条件。

所以,总结一下:

使用 时add_periodic_task,是否可以根据 Flask 知道的其他内容更改频率值,即当前日期时间和.env应用程序在其配置中加载的某些值?

标签: flaskceleryperiodic-task

解决方案


推荐阅读