首页 > 解决方案 > 气流跳过任务的最佳方式?

问题描述

我正在尝试独立并行运行任务,

我看起来像这样:

                                      ---> patternA   ---> file1a
                                                      ---> file2a
                                                      ---> file3a
sensor ---> move_csv ---> result_mv                                   ---> rerun_dag

                                      ---> patternB   ---> file1b
                                                      ---> file2b
                                                      ---> file3b

我的 dag.py:

sensor = FileSensor(
    task_id="sensor ",
    filepath=filePath,
    fs_conn_id='airflow_db',
    poke_interval=10,
    dag=dag,
)
move_csv = BranchPythonOperator(
    task_id='move_csv',
    python_callable=moveCsvFile,
    trigger_rule='none_failed',
    dag=dag,
)
result_mv = BranchPythonOperator(
    task_id='result_mv',
    python_callable=result,
    trigger_rule='none_failed',
    dag=dag,
)
pattern_A = DummyOperator(
    task_id="pattern_A ",
    dag=dag,
)
pattern_B = DummyOperator(
    task_id="pattern_B ",
    dag=dag,
)
file1 = BashOperator(
    task_id="file1a ",
    bash_command='python3 '+scriptPath+'file1.py "{{ execution_date }}"',
    trigger_rule='one_success',
    dag=dag,
)
file2 = BashOperator(
    task_id="file2a",
    bash_command='python3 '+scriptPath+'file2.py "{{ execution_date }}"',
    trigger_rule='one_success',
    dag=dag,
)
file3 = BashOperator(
    task_id="file3a",
    bash_command='python3 '+scriptPath+'file3.py "{{ execution_date }}"',
    trigger_rule='one_success',
    dag=dag,
)
file1 = BashOperator(
    task_id="file1b ",
    bash_command='python3 '+scriptPath+'file1b.py "{{ execution_date }}"',
    trigger_rule='one_success',
    dag=dag,
)
file2 = BashOperator(
    task_id="file2b",
    bash_command='python3 '+scriptPath+'file2b.py "{{ execution_date }}"',
    trigger_rule='one_success',
    dag=dag,
)
file3 = BashOperator(
    task_id="file3b",
    bash_command='python3 '+scriptPath+'file3b.py "{{ execution_date }}"',
    trigger_rule='one_success',
    dag=dag,
)
move_csv.set_upstream(sensor)
result_mv.set_upstream(move_csv)
patternA.set_upstream(result_mv)
patternB.set_upstream(result_mv)
file1a.set_upstream(patternA)
file2a.set_upstream(patternA)
file3a.set_upstream(patternA)
file1b.set_upstream(patternB)
file2b.set_upstream(patternB)
file3b.set_upstream(patternB)
rerun.set_uptstream( from all file ...)

如果我只有 file1a 匹配模式,那么在 patternA 中跳过 file2a 和 file3a 的最佳方法是什么?如果我有 file1a 和 file2a 匹配,我想并行运行它们并跳过 file3a。

我的文件任务正在使用 BashOperator 运行 python 脚本调用。

感谢帮助 !:)

标签: airflow

解决方案


您可以BranchOperator用于跳过任务

更多细节在这里


推荐阅读