首页 > 解决方案 > 带有配置/参数 json 的 Airflow DAG 并循环到该参数以生成运算符

问题描述

我有一个手动触发的 dag。它需要一个参数,如:

{"id_list":"3,5,1"}

在 DAG 中,我根据这个整数列表动态创建运算符:

for id in id_list:
   task = create_task(id)

我需要id_list根据id_list. 由于不在模板字段中时无法直接引用该参数,如何初始化该列表?这就是我想在图形视图中看到它的方式,其中流程任务基于id_list参数。

在此处输入图像描述

我已经看到了动态创建任务的示例,但它们并不是真正动态的,因为列表值是硬编码的。如果有意义的话,这些任务是根据硬编码值列表动态创建的。

标签: airflow

解决方案


首先,创建固定数量的任务来执行。此示例使用 PythonOperator。在 中python_callable,如果index小于 then 的长度,param_list则执行 else raiseAirflowSkipException

        def execute(index, account_ids):
            param_list = account_ids.split(',')
            if index < len(param_list):
                print(f"execute task index {index}")
            else:
                raise AirflowSkipException


        def create_task(task_id, index):
            return PythonOperator(task_id=task_id,
                                  python_callable=execute,
                                  op_kwargs={
                                      "index": index,
                                      "account_ids": "{{ dag_run.conf['account_ids'] }}"}
                                  )

        record_size_limit = 5
        ACCOUNT_LIST = [None] * record_size_limit

        for idx in range(record_size_limit):
            task = create_task(f"task_{idx}", idx)
            task

触发 DAG 并将其作为参数传递:

在此处输入图像描述

图表视图:

在此处输入图像描述


推荐阅读