首页 > 解决方案 > 基于 dag_run conf 值的循环中的气流任务

问题描述

我正在尝试根据 dag_run conf 输入创建多个气流任务。conf 将有一个值数组,每个值都需要生成一个任务。该任务又需要将值传递给它的可调用函数。像这样的东西:

 #create this task in a loop
 task = PythonOperator(task_id="fetch_data", python_callable=fetch_data(value from array), retries=10)

Conf 的值如下:

{"fruits":["apple","kiwi","orange"]}

我认为这可以通过以下方式访问:

kwargs['dag_run'].conf('fruits')

如何在运算符之外访问此值,然后在循环中创建运算符?

标签: pythonloopstaskairflowkeyword-argument

解决方案


您可以将 PythonOperator 实例化包装在使用值列表的 for 循环中。

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

from datetime import datetime

dag = DAG(
    dag_id='fruit_name_printer',
    start_date=datetime(2021, 1, 1),
    schedule_interval='@once'
)

input = [
    'apple',
    'orange',
    'banana'
]


def call_func(fruit_name):
    print(fruit_name)


with dag:
    for fruit in input:
        printer = PythonOperator(
            task_id=f'print_{fruit}',
            python_callable=call_func,
            op_kwargs={
                'fruit_name': fruit
            }
        )

推荐阅读