flask - Flask:根据日期时间更改 Celery add_periodic_task 的频率
问题描述
我有一个 Flask 应用程序,运行 Flask 2.0.1 和 Celery 5.1.2,它运行一个 Celery 工作者,worker.py
. 我希望这段worker.py
代码足以描述我正在尝试做的事情,但如有必要,我可以添加其他文件:
from celery import Celery
from . import create_worker_app
from src.scraper.areas import scrape_areas
from src.scraper.contests import scrape_contests
from src.scraper.meta import scrape_meta
from src.scraper.questions import scrape_questions
from src.scraper.results import scrape_results
def create_celery(app):
celery = Celery(
app.import_name,
backend=app.config["RESULT_BACKEND"],
broker=app.config["CELERY_BROKER_URL"],
)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
flask_app = create_worker_app()
celery = create_celery(flask_app)
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls tasks at the desired frequency
# daily tasks
sender.add_periodic_task(86400.0, scrape_areas, name="scrape areas every day")
sender.add_periodic_task(86400.0, scrape_contests, name="scrape contests every day")
sender.add_periodic_task(86400.0, scrape_meta, name="scrape meta every day")
sender.add_periodic_task(86400.0, scrape_questions, name="scrape questions every day")
sender.add_periodic_task(86400.0, scrape_results, name="scrape results every day")
上述行为创建了一个每 24 小时运行每个方法(存在于其他文件中)的任务,这通常是我想要的。不过,我希望能够以scrape_results
几种方式覆盖该方法的这种行为:
- 默认行为(每 24 小时运行一次)在大多数情况下都很好。
- 我想检查当前日期时间是否在另外两个日期时间之间(我可以用 来做到这一点
DateTimeRange
),所以它会是这样的:
from datetime import datetime
import pytz
from datetimerange import DateTimeRange
now = datetime.now(pytz.timezone('America/Chicago'))
time_range = DateTimeRange(current_app.config["ELECTION_DAY_RESULT_HOURS_START"], current_app.config["ELECTION_DAY_RESULT_HOURS_END"])
now_formatted = now.isoformat()
if now_formatted in time_range:
我不知道把它放在哪里,因为setup_periodic_tasks
似乎只运行一次,但我想做的是如果上面的代码是True
,scrape_results
会更频繁地运行。也许每三分钟一次。
如果可以的话,我也希望能够用一个.env
值覆盖它。所以它在那个时间窗口内会有一个默认值,但是那个时间窗口可能会更高或更低。
无论如何,我很难理解这是否可能(尽管它似乎可能是从Celery 4.1.0开始的)。如果是这样,我不确定在哪里放置条件。
所以,总结一下:
使用 时add_periodic_task
,是否可以根据 Flask 知道的其他内容更改频率值,即当前日期时间和.env
应用程序在其配置中加载的某些值?
解决方案
推荐阅读
- image - 如何将 h264 I Frame 解码为 RAW 图像?
- mysql - MySQL查询上的子字符串
- assembly - 将大于 8 位的值存储到 8 位寄存器的结果
- matlab - 强制 Matlab 在显示答案之前回显每个命令
- javascript - 正则表达式拆分 url 和文本以制作自定义 html 内容
- php - PHPUnit - 调用断言特征方法
- javascript - 从javascript ES5中的数组中获取未定义的项目
- maven - 带有 Jcabi Maven 插件的 AspectJ 二进制编织不适用于 Kotlin 代码
- python - 如何使用 os.makedirs 在目录树的特定位置创建多个目录?
- asp.net-web-api - 如何使用 Google 身份验证在 C# Web API 内部进行身份验证?