python-3.x - 气流DAG调度月的最后一天-n天
问题描述
我想安排我的 dag 在一个月的最后一天前 3 天运行,所以 2 月我的 dag 应该在 25 日运行,而 3 月的 dag 应该在 28 日运行。关于如何安排这个的任何想法?
谢谢
解决方案
对于气流 < 2.2.0:
只有当您可以在单个 cron 表达式中“说出来”时,您才能安排 DAG。如果您的调度愿望不适合 cron 表达式,那么您不能将其设置为开箱即用。但是,您可以找到一个与您想要的足够接近的 cron 表达式(0 0 25-31 * *
- 从 28 到 31 的每一天),并将 aShortCircuitOperator
放在 DAG 的开头,以验证日期是否实际上是 3 天前月底。如果日期匹配,它将继续执行下游任务,如果日期不匹配,它将跳过下游任务:
import calendar
from datetime import datetime, date, timedelta
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 8, 21)
}
def check_if_last_day_of_month(execution_date):
# calendar.monthrange return a tuple (weekday of first day of the
# month, number
# of days in month)
run_date = datetime.fromtimestamp(execution_date.timestamp())
last_day_of_month = calendar.monthrange(run_date.year, run_date.month)[1]
# check if date is 3 days behind the last day of the month
if run_date == date(run_date.year, run_date.month, last_day_of_month) - timedelta(days=3):
return True
return False
with DAG(
dag_id='short_example',
schedule_interval="@once",
default_args=default_args,
) as dag:
first = ShortCircuitOperator(
task_id='verify_date',
python_callable=check_if_last_day_of_month
)
second = DummyOperator(task_id='task')
first >> second
示例运行2021-01-30
:
示例运行2021-01-28
:
注意:确保您正在比较您感兴趣的日期。在示例中,我比较execution_date
了 DAG 的。
对于 Airflow >= 2.2.0:(当前测试版 2.2.0b2)
AIP-39 更丰富的 scheduler_interval可用。您可以为调度定义自己的时间表。您可以在PR中阅读此功能的文档。
推荐阅读
- c# - 保存图形文件
- sql - 加入后的标识符无效
- angular - Angular Material 以编程方式添加选项卡
- java - Apache HttpAsyncClient 线程计数配置
- android - BLE onCharacteristicRead 接收状态 133
- android - Android 安装引荐来源网址:pcampaignid=APPU_2
- node.js - Express NodeJS 应用程序中的解析错误响应
- scala - 声明 SparkContext 类型并从 spark-cassandra-connector 访问 cassandraTable
- homebrew - 无法确定链接的 PHP - Homebrew
- python - Python Pandas - 根据匹配条件创建一个新的 df 列