airflow - 在 Apache 气流中实现跨 DAG 依赖
问题描述
我正在尝试在 2 个 DAG 之间实现 DAG 依赖关系,例如 A 和 B。DAG A 每小时运行一次,DAG B 每 15 分钟运行一次。
- 每次 DAG B 启动时,它都会运行,我想确保 DAG A 未处于运行状态。
- 如果发现 DAG A 正在运行,则 DAG B 必须等到 DAG A 完成运行。
- 如果 DAG A 未运行,则 DAG B 可以继续其任务。
DAG A:
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'dependency',
'depends_on_past': False,
'start_date': datetime(2020, 9, 10, 10, 1),
'email': ['xxxx.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('DAG_A', schedule_interval='0/60 * * * *',max_active_runs=1, catchup=False,
default_args=default_args) as dag:
task1 = DummyOperator(task_id='task1', retries=1, dag=dag)
task2 = DummyOperator(task_id='task2', retries=1, dag=dag)
task3 = DummyOperator(task_id='task3', retries=1, dag=dag)
task1 >> task2 >> task3
DAG B:
from datetime import datetime,timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'dependency',
'depends_on_past': False,
'start_date': datetime(2020, 9, 10, 10, 1),
'email': ['xxxx.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('DAG_B', schedule_interval='0/15 * * * *',max_active_runs=1, catchup=False,
default_args=default_args) as dag:
task4 = DummyOperator(task_id='task4', retries=1, dag=dag)
task5 = DummyOperator(task_id='task5', retries=1, dag=dag)
task6 = DummyOperator(task_id='task6', retries=1, dag=dag)
task4 >> task5 >> task6
我尝试过使用 ExternalTaskSensor 运算符。我无法理解传感器是否发现 DAG A 处于成功状态,它会触发下一个任务,否则等待任务完成。
提前致谢。
解决方案
您可以使用ExternalTaskSensor来实现您正在寻找的东西。关键方面是用正确的初始化这个传感器execution_date
,在你的例子execution_date
中是最后DagRun
一个DAG_A。检查此示例,其中DAG_A每 9 分钟运行 200 秒。 DAG_B每 3 分钟运行一次,运行时间为 30 秒。这些值是任意的,仅用于演示目的,几乎可以是任何值。
DAG A(这里没有什么新鲜事):
import time
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
def _executing_task(**kwargs):
print("Starting task_a")
time.sleep(200)
print("Completed task_a")
dag = DAG(
dag_id="example_external_task_sensor_a",
default_args={"owner": "airflow"},
start_date=days_ago(1),
schedule_interval="*/9 * * * *",
tags=['example_dags'],
catchup=False
)
with dag:
start = DummyOperator(
task_id='start')
task_a = PythonOperator(
task_id='task_a',
python_callable=_executing_task,
)
chain(start, task_a)
DAG B:
import time
from airflow import DAG
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
def _executing_task():
time.sleep(30)
print("Completed task_b")
@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None, **kwargs):
dag_a_last_run = get_last_dagrun(
'example_external_task_sensor_a', session)
print(dag_a_last_run)
print(f"EXEC DATE: {dag_a_last_run.execution_date}")
return dag_a_last_run.execution_date
dag = DAG(
dag_id="example_external_task_sensor_b",
default_args={"owner": "airflow"},
start_date=days_ago(1),
schedule_interval="*/3 * * * *",
tags=['example_dags'],
catchup=False
)
with dag:
start = DummyOperator(
task_id='start')
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_dag_id='example_external_task_sensor_a',
allowed_states=['success', 'failed'],
execution_date_fn=_get_execution_date_of_dag_a,
poke_interval=30
)
task_b = PythonOperator(
task_id='task_b',
python_callable=_executing_task,
)
chain(start, wait_for_dag_a, task_b)
我们正在使用 的参数execution_date_fn
来ExternalTaskSensor
获取execution_date
DAG_A 的最后一个DagRun,如果我们不这样做,它将等待DAG_A与 DAG_B 的实际运行相同,execution_date
这在许多情况下可能不存在。
该函数使用来自 Airflow 模型_get_execution_date_of_dag_a
的方法对元数据数据库进行查询以获取 exec_date 。get_last_dagrun
最后,另一个重要参数是allowed_states=['success', 'failed']
我们告诉它等待直到在其中一种状态中找到DAG_A(即,如果它处于running
状态将继续执行 poke)。
试试看,让我知道它是否对你有用!
推荐阅读
- reactjs - TypeError:无法读取 react-redux 中未定义的属性“产品”
- python - 如何使垂直堆叠的对偶图的顶部图更小?
- java - Java:ReentrantLock 不起作用它仍然相互混合
- python - 如何删除每行特殊字符串之后的字符串
- arrays - 如何在C中删除数组中的字符串
- swift - 延迟添加注释直到搜索完成
- c# - 在 C# 中重命名和删除文件夹
- javascript - 在 javascript 中使用 Bootstrap 行和列
- javascript - 在 Angular 中处理外部点击引导模式
- python - Python - 编辑文本文件 - 特定案例