首页 > 解决方案 > 如何用airflow实现更复杂的dag调度?

问题描述

Airflow 可以轻松地以固定的时间间隔运行作业。这篇文章寻求有关如何处理更复杂的调度要求的建议。

例如,假设我有从 SFTP 服务器提取文件并对其进行处理的进程。源仅发布文件 MF。我希望 dag 的行为方式如下:

似乎这实现起来并不实际,我需要做的只是设计它以提取碰巧在那里的任何文件并每天运行,而不参考特定文件。

问题是,如果我可以指定驱动的文件,execution_date那么我可以准确地看到已提取和未提取的内容,并使用重试功能。

想到的一种方法是每周创建 7 个 dags。但我不喜欢这个主意。

另一种情况是,如果我希望每个月的第二个星期日运行一个进程。有没有办法做这种事情?

编辑:我认为实现这一目标的最简洁方法是将 dag 设计为始终使用 date 提取文件execution_date,但直到星期一才触发 sat 和 sun 运行(并使用 trigger dag 运算符这样做),并使用控制器 dag 与 BranchOperator 和 TriggerDagOperator 来实现这一点。

标签: airflow

解决方案


将 DAG 设置为'schedule_interval':'0 0 * * 1-5'每周一到周五的每一天的 00:00 运行。根据需要调整时间(前两个零)。

接下来,BranchPythonOperator用作 DAG 的一种进入方式。因此,在星期一,DAG 被执行并查找带有execution_date - 0execution_date - 1和的文件execution_date - 2。周二到周五,它只是寻找execution_date - 0.

我创建了一个快速示例来向您展示我的意思。我希望这是一个足够的例子。让我知道我是否可以进一步提供帮助。

#Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
# General imports
from datetime import datetime

DAG_ID = 'stackoverflow_exampledag'

args = {
    'owner': 'you',
    'email': ['you@yourwork.com'],
    'depends_on_past': False,
    'email_on_retry': False,
    'email_on_failure': True,
    'start_date': datetime(2019, 4, 14)
}

dag = DAG(
    dag_id=DAG_ID,
    default_args=args,
    schedule_interval="0 0 * * 1-5"
    )


#################################
######## Python Script ##########
#################################


def checktheday(**kwargs):
    weekday = datetime.today().weekday()
    if weekday == 1:
        return 'monday_only_task'
    else:
        return 'tuesday_through_friday_task'


####################################
########## TASKS ###################
####################################

# BranchPythonOperator is the entry point for this DAG.
# The python callable will return the task id of the appriorate subdag/task that it's supposed to run.

checktheday_task = BranchPythonOperator(
    task_id='checktheday_task',
    python_callable=checktheday,
    dag=dag,
    provide_context=True
    )

monday_only_task = DummyOperator(
    task_id='monday_only_task',
    dag=dag
    )

tuesday_through_friday_task = DummyOperator(
    task_id='tuesday_through_friday_task',
    dag=dag


#################################
########## ORCHESTRATION ########
#################################
monday_only_task.set_upstream(checktheday_task)
tuesday_through_friday_task.set_upstream(checktheday_task)

示例图表视图


推荐阅读