airflow - 从 trigger_dag_run_operator 设置任务 ID
问题描述
在以下气流查询中需要帮助:
无法设置使用 trigger_dag_run_operator 启动的任务 ID:
下面是我的触发 dag 运行运算符和目标 python 运算符: TriggerDag 运算符:
...
trigger_target = TriggerDagRunOperator(
task_id='trigger_target',
trigger_dag_id='TargetDag',
conf={"message": "Test Message", "executed_file_name":"DAG_NAME_001"},
)
...
目标 dag 运算符:
trigger_pipeline = PythonOperator(
task_id='called_for_file'+{{dag_run['conf']['executed_file_name']}},
python_callable=call_trigger_pipeline,
)
在上面的代码中,“{{dag_run['conf']['executed_file_name']}}”没有被替换为触发器 dag run 运算符中设置的值。
谢谢,杰克
解决方案
该代码的问题在于task_id
它不是模板化字段,因此 Jinja 不会被渲染,这就解释了为什么你得到的输出包括大括号,这是预期的行为。
在不了解更多上下文的情况下,我认为您应该考虑一种不同的设计,其中任务不是动态生成的,而是动态生成DAG
的。按照Airflow FAQs中的模式,您可以动态创建 DAG 和其他任务,请考虑以下示例:
def create_dags(city_name, payload: list, default_args):
"""
Returns a DAG object
"""
def _print_load_number(city_name, load_number):
print(f"{load_number} from: {city_name} ")
dag = DAG(
f"location_sync_{city_name}",
schedule_interval="@daily",
catchup=False,
tags=["example", "dynamic_dag"],
default_args=default_args,
)
with dag:
end = DummyOperator(task_id="end")
for load_no in payload:
print_load = PythonOperator(
task_id=f"{dag_id}_proccesing_load_{load_no}",
python_callable=_print_load_number,
op_kwargs={"city_name": city, "load_number": load_no},
)
print_load >> end
# DAG level tasks dependencies
return dag
cities = [
{"name": "London", "payload": [1, 2, 3]},
{"name": "Paris", "payload": [4, 5, 6]},
{"name": "Buenos_Aires", "payload": [4, 5, 6]},
]
default_args = {"owner": "Airflow", "start_date": days_ago(1)}
for city in cities:
dag_id = city["name"]
globals()[dag_id] = create_dags(city["name"], city["payload"], default_args)
请注意,在create_dag
函数中,任务是动态创建的,并且每个任务都task_id
根据提供的值命名:task_id=f"{dag_id}_proccesing_load_{load_no}"
一旦您创建了n 个DAG,您就可以根据需要处理触发它们,包括使用TriggerDagRunOperator
另一个 DAG,这将允许定义(动态)dag_id
要触发的。您甚至可以iterable
在创建 DAG 时循环使用相同的内容。
此外,由于trigger_dag_id
是模板化字段,如果您需要定义要从 UI 或 CLI 触发的 DAG,您可以params
像这样使用宏:
trigger_service_discovery = TriggerDagRunOperator(
task_id='trigger_loc_sync',
trigger_dag_id='location_sync_{}'.format('{{ params.dag_id_from_UI }}'),
wait_for_completion=True,
)
上例中的一个 DAG 的图表视图:
查看天文学家指南以进一步阅读有关动态 DAG 生成的信息。
推荐阅读
- r - 如何根据满足的条件和时间顺序对 r 中的数据进行排序?
- angular - 实际处理来自服务器的 Angular HTTP 404 错误,这表示没有记录的有效响应
- php - wp_remote_get:获取接收到的数据
- swift - 快速减少表格视图部分之间的差距
- google-bigquery - 将一组变量和值与最近的日期时间匹配
- macos - UndoManager.setActionName 未显示在菜单中(SwiftUI macOS)
- jquery - 在Highcharts中,当图表发生变化时,如何随图表一起移动注释?
- python - 一旦我的函数嵌套并相互引用,我的元组返回 NoneType 错误,为什么?
- java - Tomcat数据库连接池高吞吐速度慢
- html - 你怎么称呼这些滚动效果,我如何在 Wordpress 中做到这一点?