python - Airflow BranchPythonOperator - 分支后继续
问题描述
我有以下运算符,如下所示。我能够直观地看到图形表示看起来是正确的。但是,我无法使功能正常工作,因为路径不会继续经过任何一个分支。无论日期如何,两条路径都不会继续到 task_05。
DAG 有两 (2) 条路径:
(1)如果是月初,task_01->test_step->task_02->task_05->task_06
(2)如果不是月初,task_01->test_step->task_03->task_04->task_05->task_06
问题:该功能无法让 DAG 一直完成到 task_06。
假设问题在于我使用按位运算符的方式。
代码
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import DAG
default_args = {
'owner': 'astro',
'depends_on_past': False,
'start_date': datetime(2020, 5, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='astro_test',
default_args=default_args,
schedule_interval="0 10 * * *",
catchup=True,
max_active_runs=1
)
def dom_branch(ds_nodash, **kwargs):
if ds_nodash[-2:] == '01':
return "task_02"
else:
return "task_03"
with dag:
one = DummyOperator(task_id='task_01')
two = DummyOperator(task_id='task_02')
three = DummyOperator(task_id='task_03')
four = DummyOperator(task_id='task_04')
five = DummyOperator(task_id='task_05')
six = DummyOperator(task_id='task_06')
dom_operator = BranchPythonOperator(
task_id=f"test_step",
provide_context=True,
op_kwargs={'ds_nodash': '{{ ds_nodash }}'},
python_callable=dom_branch
)
one >> dom_operator >> [two, three]
two >> five
three >> four >> five
five >> six
解决方案
这是因为它task_5
需要两个上游都完成才能开始。相反,您可以添加触发规则:
https ://airflow.apache.org/docs/stable/concepts.html#trigger-rules
five = DummyOperator(
task_id='task_05',
trigger_rule=TriggerRule.ONE_SUCCESS,
)
推荐阅读
- sql - BigQuery: LAG() on UNNEST()
- jquery - How can I get the src attributes of each image element with the same class in JQuery?
- android - TextureView 不支持显示背景可绘制 Xamarin Android
- python - 当任何进程完成时如何终止池?
- r - 如何使用 R 可视化 k-means 集群?
- r - 将 glmnet 与未来一起使用
- stream - 如何解释这两种广播流创作之间的区别?
- android - Flutter Gradle 文件中面临的问题
- python - Changing color roles discord
- docker - Is there way in docker compose to create the cross service constant?