首页 > 解决方案 > 如何让一个airflow dag中的一些任务不运行,其余任务按照我输入的参数运行?

问题描述

我已经定义了一个具有一定数量不同任务的气流 dag,所有这些任务都与一个传递给 dag-define 函数的参数有关。所以当我向这个参数传递不同的值时,我会得到一个具有相同任务结构的不同 dag。

但是,我遇到一个问题:如何让其中一些任务在我将某个参数传递给 dag 时不运行,而当我传递其他参数时,所有这些任务都应该正常运行。

例如,dag 代码如下所示:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from libs.config.dag_config import Config
from etl.funcs import function1, function2, function3

def build_dag(id, schedule_interval):
    config = Config()
    config.add_params({'id': id,})
    with DAG(f'{id}_etl', default_args=config.default_args, schedule_interval=schedule_interval) as dag:
        start = DummyOperator(task_id='start')
        end = DummyOperator(task_id='end')
        task1 = PythonOperator(task_id='task1',
                               python_callable=function1,
                               provide_context=True)
        task2 = PythonOperator(task_id='task2',
                               python_callable=function2,
                               provide_context=True)
        task3 = PythonOperator(task_id='task3',
                               python_callable=function3,
                               provide_context=True)
        tasks = [task1, task2, task3]
        start >> tasks >> end
        return dag

dag1 = build_dag(10001, '0 * * * *')
dag2 = build_dag(10002, '0 * * * *')
dag3 = build_dag(10003, '0 * * * *')

对于dag1和dag2,我希望三个任务都正常运行,但是对于dag3,我只想运行task1和task2,不想运行task3。如何做到这一点?

标签: airflowairflow-scheduler

解决方案


推荐阅读