airflow - 通过字典动态循环创建气流 dag
问题描述
我正在尝试通过字典键动态循环添加气流 dag 并将键分配为 dag 名称。
dags 创建良好,但我得到:“此 DAG 在网络服务器 DagBag 对象中不可用。它显示在此列表中,因为调度程序在元数据数据库中将其标记为活动”并且它不可点击。
def create_dag(dag_id):
args = build_default_args(config_file)
dag = DAG(dag_id,schedule_interval='30 11 * * *', default_args=args)
with dag:
init_task = BashOperator(
task_id='test_init_task',
bash_command='echo "task"',
dag=dag
)
init_task
return dag
def get_data(**kwargs):
my_list=[]
file = open("/home/airflow/gcs/data/test.json")
data=json.load(file)
return data
data1 = data()
for dict in data1:
for pair in dict.items():
key , value = pair
print "key",ls_table ,"value",metrics
dag_id = '{}'.format(key)
default_args = {'owner': 'airflow',
'start_date': datetime(2019, 6, 18)
}
schedule = '@daily'
globals()[dag_id] = create_dag(dag_id)
解决方案
推荐阅读
- java - 在 Spring Boot 中实现 TDD,我不能在控制器中使用 @valid 注释
- android - 如何在矢量可绘制图像中添加圆形边框?
- jenkins - 在 Jenkinsfile 中使用“findFiles”,通过后缀匹配多个文件,使用 glob (ant glob)
- python - 网状不在docker内部创建多个线程?
- python - 无法导入python模块
- java - 在 JGroups 配置中放置 test-jgroups-jdbc_ping.xml jgroup 文件的位置
- python - 来自熊猫列切片的新列
- python - 如何在 discord.py 中删除机器人分页响应
- visual-studio-code - 从 vscode 打开自定义 url
- java - Selenium - 如何获得以下兄弟姐妹?