首页 > 解决方案 > 在循环之后如何为一些独立的静态任务赋予任务依赖关系。空气流动

问题描述

我有一个 for 循环,我还有一些中间任务和循环之后的一些任务。

正如许多帖子中提到的那样,我只在 for 循环中提供任务依赖:

例子 :

individual_task1 = SSHOperator (task_id='tk_one'....)

individual_task2 = SSHOperator (task_id='tk_two'....)

individual_task3 = SSHOperator (task_id='tk_three'....)

  for i in [val1,val2,val3,val4.....valn]

    first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)

    second_task_in_loop = SSHOperator(task_id='stats_' + i...)

    individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2 >> individual_task3

但是对于 individual_task2 和 individual_task3 我得到错误:

破碎的 Dag ,, task_id 已经注册。

但这是一个未在循环中定义的单个任务,那么为什么我会收到这个错误或者我做错了什么?

标签: pythonairflow

解决方案


尝试这个:

individual_task1 = SSHOperator (task_id='tk_one'....)

individual_task2 = SSHOperator (task_id='tk_two'....)

individual_task3 = SSHOperator (task_id='tk_three'....)

  for i in [val1,val2,val3,val4.....valn]

    first_task_in_loop = SSHSparkSubmitOperator (task_id='comp_' + i,...)

    second_task_in_loop = SSHOperator(task_id='stats_' + i...)

    individual_task1 >> first_task_in_loop >> second_task_in_loop >> individual_task2

individual_task2 >> individual_task3

也许 Airflow 抱怨是因为您多次设置相同的任务流


推荐阅读