python - 如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?
问题描述
我有一个 python DAGParent Job
和 DAG Child Job
。成功完成每日运行的任务后,Child Job
应触发中的任务。Parent Job
如何添加外部作业触发器?
我的代码
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')
execute_notebook = PostgresOperator(
task_id='data_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM athena_rs.shipments limit 5",
dag=dag
)
解决方案
答案已经在这个线程中。下面是演示代码:
父代:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')
leave_work = DummyOperator(
task_id='leave_work',
dag=dag,
)
cook_dinner = DummyOperator(
task_id='cook_dinner',
dag=dag,
)
leave_work >> cook_dinner
孩子达格:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')
# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_dinner',
external_dag_id='Parent_dag',
external_task_id='cook_dinner',
start_date=datetime(2020, 4, 29),
execution_delta=timedelta(hours=1),
timeout=3600,
)
have_dinner = DummyOperator(
task_id='have_dinner',
dag=dag,
)
play_with_food = DummyOperator(
task_id='play_with_food',
dag=dag,
)
wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food
图片:
达格斯
Parent_dag
child_dag
推荐阅读
- powershell - Powershell RDS CAL 电子邮件报告
- typescript - 如何从 typescript 访问 vue 中组件的 props
- pixi.js - PIXI Sprite 显示为黑色
- php - Codeigniter 显示产品到期日期而不是库存
- arrays - Vapor 4/Fluent,具有嵌套数据库查找以返回返回 EventLoopFuture 的函数
- python - 如何在 Pandas DataFrame 数据透视表的 Matplotlib 线图中显示更多类别?
- python - 为什么我在赋值之前会引用局部变量?
- terraform - 如何将字符串添加到 Terraform 中的变量?
- javascript - 将箭头函数添加到以行和列为参数的表格单元格
- java - Spring Data r2dbc - 实体继承