首页 > 解决方案 > 气流:依赖于自身首先完成的动态任务

问题描述

我需要创建一个 DAG 来删除和更新几个不同的表。更新按地区进行。我使用的数据库在执行任何删除或更新时都会锁定表,因此我需要像下面那样构建我的 dag,以避免尝试同时更新同一个表。

-->等于dependent on

Florida_table_1 --> Carolina_table_1 --> Texas_table_1
Florida_table_2 --> Carolina_table_2 --> Texas_table_2
Florida_table_3 --> Carolina_table_3 --> Texas_table_3

更糟糕的是,我可以单独写出所有任务,但我想知道是否有一种聪明的方法可以动态地完成它?

标签: airflow

解决方案


我会这样:

list_of_states = ["Alabama", "Alaska", "Arizona" ...] # I forgot the song...

def state_task(which_state):
    print(f"Working on {which_state}!")
    [...]


with DAG(dag_id="states_process", ...) as dag:
    prior_task = the_start = DummyOperator(task_id="the_start")
    for which_state in list_of_states:
        prior_task = prior_task >> PythonOperator(
            task_id=f"{which_state}_task", 
            python_callable=state_task,
            op_args=(which_state,)
        )

这不是我的想法,但这个概念基本上是利用 Airflow 的>>语法来声明上游并返回我们保存的任务以用作下一个的上游:prior_task = prior_task >> PythonOperator


推荐阅读