airflow - 如何用airflow实现更复杂的dag调度?
问题描述
Airflow 可以轻松地以固定的时间间隔运行作业。这篇文章寻求有关如何处理更复杂的调度要求的建议。
例如,假设我有从 SFTP 服务器提取文件并对其进行处理的进程。源仅发布文件 MF。我希望 dag 的行为方式如下:
- 只运行MF;
- 在星期一,查找来自
execution_date - 0
and- 1
的文件- 2
- 周二至周五,只是寻找
execution date - 0
?
似乎这实现起来并不实际,我需要做的只是设计它以提取碰巧在那里的任何文件并每天运行,而不参考特定文件。
问题是,如果我可以指定驱动的文件,execution_date
那么我可以准确地看到已提取和未提取的内容,并使用重试功能。
想到的一种方法是每周创建 7 个 dags。但我不喜欢这个主意。
另一种情况是,如果我希望每个月的第二个星期日运行一个进程。有没有办法做这种事情?
编辑:我认为实现这一目标的最简洁方法是将 dag 设计为始终使用 date 提取文件execution_date
,但直到星期一才触发 sat 和 sun 运行(并使用 trigger dag 运算符这样做),并使用控制器 dag 与 BranchOperator 和 TriggerDagOperator 来实现这一点。
解决方案
将 DAG 设置为'schedule_interval':
在 '0 0 * * 1-5'
每周一到周五的每一天的 00:00 运行。根据需要调整时间(前两个零)。
接下来,BranchPythonOperator
用作 DAG 的一种进入方式。因此,在星期一,DAG 被执行并查找带有execution_date - 0
、execution_date - 1
和的文件execution_date - 2
。周二到周五,它只是寻找execution_date - 0
.
我创建了一个快速示例来向您展示我的意思。我希望这是一个足够的例子。让我知道我是否可以进一步提供帮助。
#Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
# General imports
from datetime import datetime
DAG_ID = 'stackoverflow_exampledag'
args = {
'owner': 'you',
'email': ['you@yourwork.com'],
'depends_on_past': False,
'email_on_retry': False,
'email_on_failure': True,
'start_date': datetime(2019, 4, 14)
}
dag = DAG(
dag_id=DAG_ID,
default_args=args,
schedule_interval="0 0 * * 1-5"
)
#################################
######## Python Script ##########
#################################
def checktheday(**kwargs):
weekday = datetime.today().weekday()
if weekday == 1:
return 'monday_only_task'
else:
return 'tuesday_through_friday_task'
####################################
########## TASKS ###################
####################################
# BranchPythonOperator is the entry point for this DAG.
# The python callable will return the task id of the appriorate subdag/task that it's supposed to run.
checktheday_task = BranchPythonOperator(
task_id='checktheday_task',
python_callable=checktheday,
dag=dag,
provide_context=True
)
monday_only_task = DummyOperator(
task_id='monday_only_task',
dag=dag
)
tuesday_through_friday_task = DummyOperator(
task_id='tuesday_through_friday_task',
dag=dag
#################################
########## ORCHESTRATION ########
#################################
monday_only_task.set_upstream(checktheday_task)
tuesday_through_friday_task.set_upstream(checktheday_task)
推荐阅读
- java - 使用 jayway JsonPath 解析 Json 时保持尾随零
- c# - CS0029:无法将类型“int”隐式转换为“bool”
- sql - 如何连接到数据库并从nodejs中的多个表中获取数据?
- angularjs - 在 angular.forEach 函数中使用 $http.post() 上传图像
- android - 是否可以“欺骗”Android Room Persistence Library 的 UI 初始化/启动异步性?
- facebook-graph-api - 在开发模式下访问应用程序的 Facebook API Pages 节点的权限
- c# - 使用 asp.net 核心 cookie 身份验证注销后防止自动 Windows 身份验证登录
- sql - 如何让 ADO 将 .csv 文件中的所有日期识别为美国日期?
- xamarin.forms - 带有导航页面的单个静态页面
- javascript - 如何从数据库向本地计算机发出实时通知