airflow - 气流:依赖于自身首先完成的动态任务
问题描述
我需要创建一个 DAG 来删除和更新几个不同的表。更新按地区进行。我使用的数据库在执行任何删除或更新时都会锁定表,因此我需要像下面那样构建我的 dag,以避免尝试同时更新同一个表。
-->
等于dependent on
Florida_table_1 --> Carolina_table_1 --> Texas_table_1
Florida_table_2 --> Carolina_table_2 --> Texas_table_2
Florida_table_3 --> Carolina_table_3 --> Texas_table_3
更糟糕的是,我可以单独写出所有任务,但我想知道是否有一种聪明的方法可以动态地完成它?
解决方案
我会这样:
list_of_states = ["Alabama", "Alaska", "Arizona" ...] # I forgot the song...
def state_task(which_state):
print(f"Working on {which_state}!")
[...]
with DAG(dag_id="states_process", ...) as dag:
prior_task = the_start = DummyOperator(task_id="the_start")
for which_state in list_of_states:
prior_task = prior_task >> PythonOperator(
task_id=f"{which_state}_task",
python_callable=state_task,
op_args=(which_state,)
)
这不是我的想法,但这个概念基本上是利用 Airflow 的>>
语法来声明上游并返回我们保存的任务以用作下一个的上游:prior_task = prior_task >> PythonOperator
推荐阅读
- laravel - Laravel 5.6 哈希
- javascript - Uncaught (in promise) TypeError: Cannot read property of 'stations' of undefined
- google-apps-script - 多个 onEdit 脚本,但在第 2 行出现错误
- amazon-web-services - 根据 AWS 中的 IP 地址将流量路由到不同的实例
- java - 在 itext 分离签名中,如何将 SignatureAppearance 和 OutputStream 对象存储在文件系统而不是会话上?
- c# - svcutil 更改类型所以所有的都是一样的
- http - 如何使用 Electron 绕过具有复杂媒体类型的 HTTP 帖子中的预检?
- sql - 从较新的行中选择日期
- c# - 实体框架 - 字符串列的唯一约束
- java - 无法从 START_OBJECT 令牌中反序列化 int[] 的实例