airflow - 基于表依赖的动态dag创建
问题描述
我正在从包含要执行的任务的表中读取,并且我还将依赖项存储在同一个表中。我正在将表格读入熊猫数据框。
我的任务 3 依赖于任务 1 和任务 2,而任务 4 依赖于任务 3 才能完成。
for index, row in odf.iterrows():
dag_id = row["DAG_ID"]
task_id = row["TASK_ID"]
task_name = row["TASK_NAME"]
script_name = row["SCRIPT_NAME"]
if row["DEPENDENT_ID"] is not None:
dependents = row["DEPENDENT_ID"].split('|')
print(dependents)
t1 = OracleOperator(task_id=task_name,
oracle_conn_id='oracle_con',
sql='Begin %s; end;' % script_name, dag=dag)
for d in dependents:
for index, row in odf[odf["TASK_ID"] == int(d)].iterrows():
t2 = OracleOperator(task_id=row["TASK_NAME"],
oracle_conn_id='oracle_con',
sql= 'Begin %s; end;' %script_name,dag = dag)
t1.set_upstream(t2)
我知道我可以做这样的事情。
t1 = OracleOperator(task_id='run_proc_ihn_reference_raw',
oracle_conn_id='oracle_con',
sql= 'Begin proc.task1; end;',dag = dag)
t2 = OracleOperator(task_id='run_proc_aim_codelist_raw',
oracle_conn_id='oracle_con',
sql= 'Begin proc.task2; end;',dag = dag)
t3 = OracleOperator(task_id='run_proc_decline_reason_dim_build',
oracle_conn_id='oracle_con',
sql= 'Begin proc.task3; end;',dag = dag)
t4 = OracleOperator(task_id='run_proc_decline_reason_dim_load',
oracle_conn_id='oracle_con',
sql= 'Begin proc.task4; end;',dag = dag)
(t1,t2) >> t3 >> t4
但我可能有超过 100 个程序,因此使用上述方法寻找要使用依赖项创建的 dag。
同样需要帮助。谢谢
解决方案
在处理涉及复杂依赖项的大量任务时,我发现我通常最终会重复相当多的“任务样板”,正如您在示例中所示。
在这些情况下,我喜欢让 Python 在创建任务并将它们连接起来时做“繁重的工作”:
default_args = {
"oracle_conn_id": "oracle_con"
}
task_dict = {
"ihn_reference_raw": {"proc": "task1"},
"aim_codelist_raw": {"proc": "task2"},
"decline_reason_dim_build": {"proc": "task3",
"upstream": ["ihn_reference_raw",
"aim_codelist_raw"]},
"decline_reason_dim_load": {"proc": "task4",
"upstream": ["decline_reason_dim_build"]}
}
...
with DAG(
...,
default_args=default_args
) as dag:
# Iterate the details to create the tasks
for task_id, details in task_dict.items():
OracleOperator(task_id=f"run_proc_{task_id}",
sql=f"BEGIN {details['proc']}; END;")
# Iterate a second time to "wire up" the upstream tasks.
for task_id, details in task_dict.items():
if task_up := details.get("upstream"):
dag.get_task(f"run_proc_{task_id}").set_upstream(task_up)
(为简洁起见,我省略了很多,但想法就在那里)
关键是找到流程中重复的部分,存储每个任务独有的东西(在我们task_dict
的示例中),然后循环构建。
推荐阅读
- python-3.x - Pymongo 无法使用变量查询
- java - 如何从 txt 文件中的单行读取值
- qml - QML Plasma 小部件:DropShadow 导致持续警告
- css - CSS Grid 是否允许带有 grid-template-rows 和 grid-area 的 grid-auto-columns?
- javascript - 如何将 API 数据设置为带有 lit 的内部 html
- javascript - 如何检测自动完成列表鼠标悬停并隐藏占位符范围
- python - 如果我需要跳过其中的一些字符,如何转换列表列表中的列表?
- xml - 清单或策略中的激活上下文生成失败错误 .invalid xml 语法
- c# - 如何对特定列应用过滤?
- apache-flink - 是否有反映 Flink 保存点的 API?