首页 > 解决方案 > 如何在气流中动态创建任务

问题描述

我不知道如何在预定时间在气流中动态创建任务。My Dag 是在知道运行时需要多少任务之前创建的。即,在每个 dag 触发器上,我想传递要处理的目录以创建以下 Dag 的任务列表。

到目前为止我什么都想不出来

args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


dir = '/home/uname/dir'
filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2

在这里,我应该如何在触发 Dag 时传递“dir”变量,以便 task1 和 task2 将根据“dir”中存在的文件数运行。

标签: pythonairflow

解决方案


您可以使用气流变量或环境变量。

# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")

# Using Env Vars
import os
dir1= os.environ["dir1"]

args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2

推荐阅读