首页 > 解决方案 > 如何传递动态参数气流运算符?

问题描述

我正在使用 Airflow 在 Google Cloud Composer 上运行 Spark 作业。我需要

使用 Airflow API - 我可以读取 YAML 文件,并使用 xcom 跨任务推送变量。

但是,考虑到DataprocClusterCreateOperator()

和其他一些参数被标记为模板。

如果我想将其他参数作为模板传递(目前不是这样)怎么办?- 像image_versionnum_workers等等worker_machine_type

有什么解决方法吗?

标签: google-cloud-platformgoogle-cloud-composerairflow

解决方案


不确定“动态”是什么意思,但是当 yaml 文件更新时,如果读取文件过程在 dag 文件正文中,则 dag 将被刷新以从 yaml 文件中申请新的 args。所以实际上,您不需要 XCOM 来获取参数。只需简单地创建一个 params 字典,然后传递给 default_args:

CONFIGFILE = os.path.join(
    os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')

with open(CONFIGFILE, 'r') as ymlfile:
    CFG = yaml.load(ymlfile)

default_args = {
    'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
    'project_id': CFG['section_A']['project_id'],
    'zone': CFG['section_A']['zone'],
    'mage_version': CFG['section_A']['image_version'],
    'num_workers': CFG['section_A']['num_workers'],
    'worker_machine_type': CFG['section_A']['worker_machine_type'],
    # you can add all needs params here.
}

DAG = DAG(
    dag_id=DAG_NAME,
    schedule_interval=SCHEDULE_INTEVAL,
    default_args=default_args, # pass the params to DAG environment
)

Task1 = DataprocClusterCreateOperator(
    task_id='your_task_id',
    dag=DAG
)

但是如果你想要动态的 dag 而不是参数,你可能需要像这样的其他策略。

所以你可能需要弄清楚基本概念:动态在哪个级别?任务级别?DAG级别?

或者您可以创建自己的 Operator 来完成这项工作并获取参数。


推荐阅读