python - BranchOperator 正在跳过气流
问题描述
我有这个流程: -
execution_date_hour = "{{ execution_date.strftime('%H') }}"
default_args = {
'owner': 'hourly-airflow',
'depends_on_past': False,
'catch_up': False,
'start_date': days_ago(1),
'email': failure_email_list,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('hourly_pipeline_dag',
default_args=default_args,
tags=['hourly'],
schedule_interval='@hourly',
catchup=False)
taskA = PostgresOperator(dag=dag,
task_id='taskA',
postgres_conn_id='database_connection',
sql='sql/hourly_entry.sql')
taskb = DummyOperator(
dag=dag,
task_id="taskb"
)
taske = DummyOperator(
dag=dag,
task_id="taske"
)
taskc = DummyOperator(
dag=dag,
task_id="taskc"
)
taskd = DummyOperator(
dag=dag,
task_id="taskd"
)
branch_op = BranchPythonOperator(
task_id='branch_op',
python_callable=lambda
**kwargs: 'feed_sensor_a' if execution_date_hour == '5' else 'feed_sensor_b',
dag=dag)
feed_sensor_a = SqlSensor(dag=dag,
task_id='feed_sensor_a',
conn_id='database_connection',
sql='sql/sensor_hourly.sql',
poke_interval=30,
trigger_rule=TriggerRule.ONE_SUCCESS,
timeout=3600)
feed_sensor_b = SqlSensor(dag=dag,
task_id='feed_sensor_b',
conn_id='database_connection',
sql='sql/sensor.sql',
poke_interval=30,
trigger_rule=TriggerRule.ONE_SUCCESS,
timeout=3600)
taskA >> [taskb,taskc]
taskb >> taskd
taskc >> taske
[taskd,taske] >> branch_op
branch_op >> [feed_sensor_a,feed_sensor_b]
管道一直运行到 taskd 和 taske,branch_op 被跳过。请帮忙,我被困在这个问题上太久了。直到 taske 和 taskd 运行良好,branch_op 以红色突出显示,即跳过,不知道这里发生了什么。(所有这些任务都是虚拟任务,实际上它们是 HttpOperator 和 Postgres op)。提前感谢,如果需要任何其他信息,请告诉我。
解决方案
运行您的代码我没有看到branch_op
任务失败或被跳过。但是,我认为您的BranchPythonOperator
任务不会像您希望的那样工作。没有输入传递到 lambda 函数,python_callable
也不是运算符的模板化字段(即,逻辑正在评估文字字符串"{{ execution_date.strftime('%H') }}"
,因此流程将始终跟随feed_sensor_b
。试试这个:
branch_op = BranchPythonOperator(
task_id="branch_op",
python_callable=lambda execution_date_hour: "feed_sensor_a" if execution_date_hour == "5" else "feed_sensor_b",
op_args=[execution_date_hour],
dag=dag,
)
推荐阅读
- python - 熊猫未能分离 csv 文件的列
- css - 如何获得未定义 CSS 变量的警告
- php - 将包含图像的文本列转换为 BLOB 列会使事情中断
- sql - 在 SQL WHERE...IN 子句中包含连接字段
- tinyos - 如何在 TinyOS 中测试多任务处理?
- nlp - 如何自定义 spaCy 的分词器以防止拆分由正则表达式描述的短语
- php - 将数组插入 PDO (PHP):“SQLSTATE[HY093]: Invalid parameter number”
- c# - C# 从列表中返回一个随机字符串
- python - 使用两个函数时出现“NameError: name '' is not defined”
- mysql - 在 MySQL/MariaDB 中创建过程并定义变量导致错误 #1064