首页 > 解决方案 > 通过循环列表并传递参数在 Airflow 中创建任务

问题描述

编辑: 这会起作用,我定义了导致问题的 ex_func_airflow(var_1 = i)

我想通过在列表上循环来在气流中创建任务。

tabs = [1,2,3,4,5]
for i in tabs:
    task = PythonOperator(
    task_id = name,
    provide_context=False,
    op_args  = [i],
    python_callable=ex_func_airflow,
    dag=dag)
    task_0 >> task >> task_1

当它在气流中运行时,传递的参数始终是该列表中的最后一个元素。

所以我基本上是在运行:

ex_func_airflow(6) 

五次而不是跑步

ex_func_airflow(1)
ex_func_airflow(2)
ex_func_airflow(3)

..ETC。

我如何才能为每个任务传递正确的参数?

标签: airflowairflow-scheduler

解决方案


以下代码对我有用。

def print_context(ds, **kwargs):
    print("hello")


def ex_func_airflow(i):
    print(i)


dag = DAG(
    dag_id="loop_dag",
    schedule_interval=None,
    start_date=datetime(2018, 12, 31),
)

task_0 = PythonOperator(
    task_id='task_0',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

task_1 = PythonOperator(
    task_id='task_1',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

tabs = [1, 2, 3, 4, 5]
for i in tabs:
    task_id = f'task_tab_{i}'
    task = PythonOperator(
        task_id=task_id,
        provide_context=False,
        op_args=[i],
        python_callable=ex_func_airflow,
        dag=dag)
    task_0 >> task >> task_1

推荐阅读