airflow - Airflow 中的动态任务 ID 名称
问题描述
我有一个DataflowTemplateOperator
可以处理不同 json 文件的 DAG。当我触发 dag 时,我通过传递一些参数{{dag_run.conf['param1']}}
并且工作正常。
我遇到的问题是尝试task_id
根据 param1 重命名。
IE task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",
它只抱怨字母数字字符或
task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']),
它无法识别 dag_run 以及 alpha 问题。
这背后的整个想法是,当我在数据流作业控制台上看到作业失败时,我知道谁是基于 param1 的罪犯。数据流作业名称基于 task_id,如下所示:
df-operator-read-object-json-file-8b9eecec
我需要的是这个:
df-operator-read-object-param1-json-file-8b9eecec
如果可能的话,有什么想法吗?
解决方案
无需为每个文件生成新的运算符。
DataflowTemplatedJobStartOperator
具有job_name
参数,该参数也是模板化的,因此可以与 Jinja 一起使用。
我没有测试它,但这应该工作:
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
task_id="df_operator_read_object_json_file",
job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
template='gs://dataflow-templates/your_template',
location='europe-west3',
)
推荐阅读
- javascript - 缺少 /node_modules/fibers/bin/linux-x64-v8-7.6/fibers.node `。尝试重新安装“node-fibers”?
- heroku - 如何使用自定义 SSL 正确链接 Heroku、CloudFront、Route53?
- delphi - 如何将 Delphi OleVariant 内容转储到文件中?
- flutter - firebase_crashlytics - 是什么触发了报告的上传?
- javascript - 为什么为输入设置值在 Whatsapp 上不起作用?
- nuget - NuGet v4 contentFiles 没有被复制到输出
- python - 具有 epsilon-greedy 策略的 n 臂老虎机
- wordpress - 按字母组分组名称 - Twig、ACF
- powershell - 我需要编写脚本来每周自动更改服务器上的 IP 地址,并将最后一个八位字节加一
- spring-boot - JHipster 单体构建生产失败