首页 > 解决方案 > 从 trigger_dag_run_operator 设置任务 ID

问题描述

在以下气流查询中需要帮助:

无法设置使用 trigger_dag_run_operator 启动的任务 ID:

下面是我的触发 dag 运行运算符和目标 python 运算符: TriggerDag 运算符:

    ...
    trigger_target = TriggerDagRunOperator(
        task_id='trigger_target',
        trigger_dag_id='TargetDag',
        conf={"message": "Test Message", "executed_file_name":"DAG_NAME_001"},
    )
    ...

目标 dag 运算符:

    trigger_pipeline = PythonOperator(
        task_id='called_for_file'+{{dag_run['conf']['executed_file_name']}},
        python_callable=call_trigger_pipeline,
    )

在上面的代码中,“{{dag_run['conf']['executed_file_name']}}”没有被替换为触发器 dag run 运算符中设置的值。

谢谢,杰克

标签: airflow

解决方案


该代码的问题在于task_id它不是模板化字段,因此 Jinja 不会被渲染,这就解释了为什么你得到的输出包括大括号,这是预期的行为。

在不了解更多上下文的情况下,我认为您应该考虑一种不同的设计,其中任务不是动态生成的,而是动态生成DAG的。按照Airflow FAQs中的模式,您可以动态创建 DAG 和其他任务,请考虑以下示例:

def create_dags(city_name, payload: list, default_args):
    """
    Returns a DAG object
    """

    def _print_load_number(city_name, load_number):
        print(f"{load_number} from: {city_name} ")

    dag = DAG(
        f"location_sync_{city_name}",
        schedule_interval="@daily",
        catchup=False,
        tags=["example", "dynamic_dag"],
        default_args=default_args,
    )

    with dag:
        end = DummyOperator(task_id="end")
        for load_no in payload:
            print_load = PythonOperator(
                task_id=f"{dag_id}_proccesing_load_{load_no}",
                python_callable=_print_load_number,
                op_kwargs={"city_name": city, "load_number": load_no},
            )
            print_load >> end

    # DAG level tasks dependencies
    return dag

cities = [
    {"name": "London", "payload": [1, 2, 3]},
    {"name": "Paris", "payload": [4, 5, 6]},
    {"name": "Buenos_Aires", "payload": [4, 5, 6]},
]

default_args = {"owner": "Airflow", "start_date": days_ago(1)}

for city in cities:
    dag_id = city["name"]

    globals()[dag_id] = create_dags(city["name"], city["payload"], default_args)

请注意,在create_dag函数中,任务是动态创建的,并且每个任务都task_id根据提供的值命名:task_id=f"{dag_id}_proccesing_load_{load_no}"

一旦您创建了n 个DAG,您就可以根据需要处理触发它们,包括使用TriggerDagRunOperator另一个 DAG,这将允许定义(动态)dag_id要触发的。您甚至可以iterable在创建 DAG 时循环使用相同的内容。

此外,由于trigger_dag_id是模板化字段,如果您需要定义要从 UI 或 CLI 触发的 DAG,您可以params像这样使用宏:

trigger_service_discovery = TriggerDagRunOperator(
    task_id='trigger_loc_sync',
    trigger_dag_id='location_sync_{}'.format('{{ params.dag_id_from_UI }}'),
    wait_for_completion=True,
)

上例中的一个 DAG 的图表视图:

图视图

查看天文学家指南以进一步阅读有关动态 DAG 生成的信息。


推荐阅读