首页 > 解决方案 > 基于表依赖的动态dag创建

问题描述

我正在从包含要执行的任务的表中读取,并且我还将依赖项存储在同一个表中。我正在将表格读入熊猫数据框。

这是我读入熊猫数据框的表格

我的任务 3 依赖于任务 1 和任务 2,而任务 4 依赖于任务 3 才能完成。

for index, row in odf.iterrows():

 dag_id = row["DAG_ID"]
 task_id = row["TASK_ID"]
 task_name = row["TASK_NAME"]
 script_name = row["SCRIPT_NAME"]
 if row["DEPENDENT_ID"] is not None:
   dependents = row["DEPENDENT_ID"].split('|')
   print(dependents)

   t1 = OracleOperator(task_id=task_name,
                   oracle_conn_id='oracle_con',
                   sql='Begin %s; end;' % script_name, dag=dag)

   for d in dependents:

     for index, row in odf[odf["TASK_ID"] == int(d)].iterrows():
          t2 =  OracleOperator(task_id=row["TASK_NAME"],
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin %s; end;' %script_name,dag = dag)
          t1.set_upstream(t2)

但我的输出无法按预期出现,以下是我所看到的。 我知道我在循环中做错了什么

我知道我可以做这样的事情。

t1 = OracleOperator(task_id='run_proc_ihn_reference_raw',
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin proc.task1; end;',dag = dag)

t2 = OracleOperator(task_id='run_proc_aim_codelist_raw',
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin proc.task2; end;',dag = dag)

t3 = OracleOperator(task_id='run_proc_decline_reason_dim_build',
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin proc.task3; end;',dag = dag)

t4 = OracleOperator(task_id='run_proc_decline_reason_dim_load',
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin proc.task4; end;',dag = dag)

(t1,t2) >> t3 >> t4

但我可能有超过 100 个程序,因此使用上述方法寻找要使用依赖项创建的 dag。

同样需要帮助。谢谢

标签: airflow

解决方案


在处理涉及复杂依赖项的大量任务时,我发现我通常最终会重复相当多的“任务样板”,正如您在示例中所示。

在这些情况下,我喜欢让 Python 在创建任务并将它们连接起来时做“繁重的工作”:

default_args = {
    "oracle_conn_id": "oracle_con"
}

task_dict = {
    "ihn_reference_raw":        {"proc": "task1"},
    "aim_codelist_raw":         {"proc": "task2"},
    "decline_reason_dim_build": {"proc": "task3", 
                                 "upstream": ["ihn_reference_raw", 
                                              "aim_codelist_raw"]},
    "decline_reason_dim_load":  {"proc": "task4", 
                                 "upstream": ["decline_reason_dim_build"]}
}

...

with DAG(
    ..., 
    default_args=default_args
) as dag:

    # Iterate the details to create the tasks
    for task_id, details in task_dict.items():
        OracleOperator(task_id=f"run_proc_{task_id}",
                       sql=f"BEGIN {details['proc']}; END;")

    # Iterate a second time to "wire up" the upstream tasks.
    for task_id, details in task_dict.items():
        if task_up := details.get("upstream"):
            dag.get_task(f"run_proc_{task_id}").set_upstream(task_up)

(为简洁起见,我省略了很多,但想法就在那里)

关键是找到流程中重复的部分,存储每个任务独有的东西(在我们task_dict的示例中),然后循环构建。


推荐阅读