python - Apache Airflow:如何动态触发多个 DAG?
问题描述
我有 2 个 DAG:1. DAG1 - 跑步者 2. DAG2 - 管道
我需要在 DAG1 中获取一些 JSON 端点,获取一个包含 N 个项目的数组response.data.items
,并通过传递的每个项目触发 DAG2item.somedata
怎么做?
更新。我试过了
dag = DAG(
'fetch-1',
default_args=default_args,
description='Fetching emails',
schedule_interval=timedelta(days=1),
)
t1 = DummyOperator(
task_id='start',
dag=dag,
)
r = requests.post('http://host.docker.internal:8080/fetch-emails')
j = r.json()
for _, msg in j.data.messages:
tx = DummyOperator(
task_id='email_pipeline_{}'.format(msg.id),
dag=dag,
)
t1 >> tx
解决方案
您正在尝试运行 dag,同时使用该 dag 的中间结果来创建自身。这是不可能的。
相反,您可以做的是,从 dag1,使用此处dag_runs
描述的端点触发多个 dag2 运行。
推荐阅读
- r - 将数字 NA 和 0 替换为空白,所有大于 0 的值替换为“X”
- firebase - 如何在单个项目中为我的每个 Firebase 站点设置谷歌分析?
- wordpress - WordPress 主题中的航点仅在打开开发人员工具时有效
- c# - 新统一,为什么这个脚本不移动对象?(统一 3D)
- c# - 将自定义反序列化程序发布到 Azure 流分析
- android - Jetpack Navigation NavigationDrawer,菜单项的自定义操作
- r - 这个箱线图缺口正确吗?
- node.js - How to call Terminal Commands in a TEXT File?
- java - 如何创建 Firebase 短动态链接
- artifactory - JCenter maven-metadata.xml 没有来自 Maven Central 的所有模块版本