首页 > 解决方案 > Apache Airflow:如何动态触发多个 DAG?

问题描述

我有 2 个 DAG:1. DAG1 - 跑步者 2. DAG2 - 管道

我需要在 DAG1 中获取一些 JSON 端点,获取一个包含 N 个项目的数组response.data.items,并通过传递的每个项目触发 DAG2item.somedata

怎么做?

更新。我试过了

dag = DAG(
    'fetch-1',
    default_args=default_args,
    description='Fetching emails',
    schedule_interval=timedelta(days=1),
)

t1 = DummyOperator(
  task_id='start',
  dag=dag,
)

r = requests.post('http://host.docker.internal:8080/fetch-emails')
j = r.json()


for _, msg in j.data.messages:
  tx = DummyOperator(
    task_id='email_pipeline_{}'.format(msg.id),
    dag=dag,
  )
  t1 >> tx

标签: pythonairflow

解决方案


您正在尝试运行 dag,同时使用该 dag 的中间结果来创建自身。这是不可能的。

相反,您可以做的是,从 dag1,使用此处dag_runs描述的端点触发多个 dag2 运行。


推荐阅读