首页 > 解决方案 > 在 Apache 气流中实现跨 DAG 依赖

问题描述

我正在尝试在 2 个 DAG 之间实现 DAG 依赖关系,例如 A 和 B。DAG A 每小时运行一次,DAG B 每 15 分钟运行一次。

  1. 每次 DAG B 启动时,它都会运行,我想确保 DAG A 未处于运行状态。
  2. 如果发现 DAG A 正在运行,则 DAG B 必须等到 DAG A 完成运行。
  3. 如果 DAG A 未运行,则 DAG B 可以继续其任务。

DAG A:

from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'dependency',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 10, 10, 1),
    'email': ['xxxx.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('DAG_A', schedule_interval='0/60 * * * *',max_active_runs=1, catchup=False,
         default_args=default_args) as dag:

    task1 = DummyOperator(task_id='task1', retries=1, dag=dag)
    task2 = DummyOperator(task_id='task2', retries=1, dag=dag)
    task3 = DummyOperator(task_id='task3', retries=1, dag=dag)

    task1 >> task2 >> task3

DAG B:

from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'dependency',
    'depends_on_past': False,
    'start_date': datetime(2020, 9, 10, 10, 1),
    'email': ['xxxx.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('DAG_B', schedule_interval='0/15 * * * *',max_active_runs=1, catchup=False,
        default_args=default_args) as dag:

    task4 = DummyOperator(task_id='task4', retries=1, dag=dag)
    task5 = DummyOperator(task_id='task5', retries=1, dag=dag)
    task6 = DummyOperator(task_id='task6', retries=1, dag=dag)

    task4 >> task5 >> task6

我尝试过使用 ExternalTask​​Sensor 运算符。我无法理解传感器是否发现 DAG A 处于成功状态,它会触发下一个任务,否则等待任务完成。

提前致谢。

标签: airflowdirected-acyclic-graphs

解决方案


您可以使用ExternalTask​​Sensor来实现您正在寻找的东西。关键方面是用正确的初始化这个传感器execution_date,在你的例子execution_date中是最后DagRun一个DAG_A。检查此示例,其中DAG_A每 9 分钟运行 200 秒。 DAG_B每 3 分钟运行一次,运行时间为 30 秒。这些值是任意的,仅用于演示目的,几乎可以是任何值。

DAG A(这里没有什么新鲜事):

import time
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def _executing_task(**kwargs):
    print("Starting task_a")
    time.sleep(200)
    print("Completed task_a")


dag = DAG(
    dag_id="example_external_task_sensor_a",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="*/9 * * * *",
    tags=['example_dags'],
    catchup=False
)
with dag:

    start = DummyOperator(
        task_id='start')

    task_a = PythonOperator(
        task_id='task_a',
        python_callable=_executing_task,
    )

chain(start, task_a)

DAG B:

import time
from airflow import DAG
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor


def _executing_task():
    time.sleep(30)
    print("Completed task_b")


@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None,  **kwargs):
    dag_a_last_run = get_last_dagrun(
        'example_external_task_sensor_a', session)
    print(dag_a_last_run)
    print(f"EXEC DATE: {dag_a_last_run.execution_date}")
    return dag_a_last_run.execution_date


dag = DAG(
    dag_id="example_external_task_sensor_b",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="*/3 * * * *",
    tags=['example_dags'],
    catchup=False
)
with dag:

    start = DummyOperator(
        task_id='start')

    wait_for_dag_a = ExternalTaskSensor(
        task_id='wait_for_dag_a',
        external_dag_id='example_external_task_sensor_a',
        allowed_states=['success', 'failed'],
        execution_date_fn=_get_execution_date_of_dag_a,
        poke_interval=30
    )
    task_b = PythonOperator(
        task_id='task_b',
        python_callable=_executing_task,
    )

chain(start, wait_for_dag_a,  task_b)

我们正在使用 的参数execution_date_fnExternalTaskSensor获取execution_dateDAG_A 的最后一个DagRun,如果我们不这样做,它将等待DAG_A与 DAG_B 的实际运行相同execution_date这在许多情况下可能不存在。

该函数使用来自 Airflow 模型_get_execution_date_of_dag_a的方法对元数据数据库进行查询以获取 exec_date 。get_last_dagrun

最后,另一个重要参数是allowed_states=['success', 'failed']我们告诉它等待直到在其中一种状态中找到DAG_A(即,如果它处于running状态将继续执行 poke)。

试试看,让我知道它是否对你有用!


推荐阅读