首页 > 解决方案 > Is there way to call task within the task in Airflow?

问题描述

I have the following logic:

def function_1():
    if condition_1 == True:
        function_2()
    else:
        function_3()
        function_2()

Is there any way to call the task within the task in Airflow?

标签: pythonairflowairflow-scheduler

解决方案


You can achieve the same using BranchPythonOperator as below.

    def function_1(**kwargs):
      if condition_1 == True :
        return 'function_2'
      else:
        return 'function_3'

    check_task = BranchPythonOperator(
      task_id='check_task',
      python_callable=function_1, # defined above method holds the branching condition
      provide_context=True,
      dag=dag
    )

    function_2_task = BashOperator(
      task_id= 'function_2',
      bash_command="echo function_2 task executed",
      dag=dag
    )

    function_3_task = BashOperator(
      task_id= 'function_3',
      bash_command="echo function_3 task executed",
      dag=dag
    )

    function_2_task.set_upstream(check_task)
    function_3_task.set_upstream(check_task)
    function_3_task.set_upstream(function_2_task)

Tasks Graph View


推荐阅读