python - 如何在气流中动态创建任务
问题描述
我不知道如何在预定时间在气流中动态创建任务。My Dag 是在知道运行时需要多少任务之前创建的。即,在每个 dag 触发器上,我想传递要处理的目录以创建以下 Dag 的任务列表。
到目前为止我什么都想不出来
args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}
dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)
dir = '/home/uname/dir'
filesInDir = next(os.walk(dir))[2]
for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'
task1 >> task2
在这里,我应该如何在触发 Dag 时传递“dir”变量,以便 task1 和 task2 将根据“dir”中存在的文件数运行。
解决方案
您可以使用气流变量或环境变量。
# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")
# Using Env Vars
import os
dir1= os.environ["dir1"]
args = {
'owner': 'airflow',
'start_date': datetime(2004, 11, 12),
}
dag = DAG(
dag_id='dyn_test',
default_args=args,
schedule_interval='@once'
)
filesInDir = next(os.walk(dir))[2]
for file in filesInDir:
task1 = # change 'file' structure
task2 = # store changed 'file'
task1 >> task2
推荐阅读
- javascript - Babel 不预处理文件
- python - AssertionError:管理器项目的数量必须等于块项目的联合#管理器项目:6004,#tot_items:6005
- python - Python 3 Django Rest Framework - Manager 对象没有属性错误
- reactjs - SweetAlert2-react 取消按钮不起作用
- c# - Visual Studio 2019 C# Windows Forms App (.NET Framework) - 没有视图设计器或查看代码
- python - 使用opencv显示当前坐标圆而不是所有点
- sql - SQL - 获取行的 MIN 值并检查此 MIN 值是否在行中至少 2 次
- javascript - 如何解决 SassError:属性“enableProdMode”必须后跟“:”?
- c# - 记录使用单独程序执行的所有功能
- caching - 使用实体框架在 ASP.NET Core MVC 中的 IMemoryCache 中缓存实体本身