首页 > 解决方案 > Airflow 中的动态任务 ID 名称

问题描述

我有一个DataflowTemplateOperator可以处理不同 json 文件的 DAG。当我触发 dag 时,我通过传递一些参数{{dag_run.conf['param1']}}并且工作正常。

我遇到的问题是尝试task_id根据 param1 重命名。

IE task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",

它只抱怨字母数字字符或

task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']), 它无法识别 dag_run 以及 alpha 问题。

这背后的整个想法是,当我在数据流作业控制台上看到作业失败时,我知道谁是基于 param1 的罪犯。数据流作业名称基于 task_id,如下所示:

df-operator-read-object-json-file-8b9eecec

我需要的是这个:

df-operator-read-object-param1-json-file-8b9eecec

如果可能的话,有什么想法吗?

标签: airflowgoogle-cloud-composer

解决方案


无需为每个文件生成新的运算符。 DataflowTemplatedJobStartOperator具有job_name参数,该参数也是模板化的,因此可以与 Jinja 一起使用。

我没有测试它,但这应该工作:

from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
        task_id="df_operator_read_object_json_file",
        job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
        template='gs://dataflow-templates/your_template',
        location='europe-west3',
    )

推荐阅读