airflow - 气流跳过任务的最佳方式?
问题描述
我正在尝试独立并行运行任务,
我看起来像这样:
---> patternA ---> file1a
---> file2a
---> file3a
sensor ---> move_csv ---> result_mv ---> rerun_dag
---> patternB ---> file1b
---> file2b
---> file3b
我的 dag.py:
sensor = FileSensor(
task_id="sensor ",
filepath=filePath,
fs_conn_id='airflow_db',
poke_interval=10,
dag=dag,
)
move_csv = BranchPythonOperator(
task_id='move_csv',
python_callable=moveCsvFile,
trigger_rule='none_failed',
dag=dag,
)
result_mv = BranchPythonOperator(
task_id='result_mv',
python_callable=result,
trigger_rule='none_failed',
dag=dag,
)
pattern_A = DummyOperator(
task_id="pattern_A ",
dag=dag,
)
pattern_B = DummyOperator(
task_id="pattern_B ",
dag=dag,
)
file1 = BashOperator(
task_id="file1a ",
bash_command='python3 '+scriptPath+'file1.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file2 = BashOperator(
task_id="file2a",
bash_command='python3 '+scriptPath+'file2.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file3 = BashOperator(
task_id="file3a",
bash_command='python3 '+scriptPath+'file3.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file1 = BashOperator(
task_id="file1b ",
bash_command='python3 '+scriptPath+'file1b.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file2 = BashOperator(
task_id="file2b",
bash_command='python3 '+scriptPath+'file2b.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
file3 = BashOperator(
task_id="file3b",
bash_command='python3 '+scriptPath+'file3b.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
move_csv.set_upstream(sensor)
result_mv.set_upstream(move_csv)
patternA.set_upstream(result_mv)
patternB.set_upstream(result_mv)
file1a.set_upstream(patternA)
file2a.set_upstream(patternA)
file3a.set_upstream(patternA)
file1b.set_upstream(patternB)
file2b.set_upstream(patternB)
file3b.set_upstream(patternB)
rerun.set_uptstream( from all file ...)
如果我只有 file1a 匹配模式,那么在 patternA 中跳过 file2a 和 file3a 的最佳方法是什么?如果我有 file1a 和 file2a 匹配,我想并行运行它们并跳过 file3a。
我的文件任务正在使用 BashOperator 运行 python 脚本调用。
感谢帮助 !:)
解决方案
您可以BranchOperator
用于跳过任务
更多细节在这里
推荐阅读
- makefile - 如何为构建目录中创建的任何库归档编写一个模式规则?
- telegram - Telethon - 更改会话文件路径
- string - Haskell:如何将整数列表与字符串索引匹配
- javascript - 将数组的可观察对象转换为可观察对象的数组
- reactjs - 我想使用 jsPDF 在 PDF 的每一页中显示每个数组数据。有可能的?
- sql-server - Azure 托管实例 :: 为什么要列出扩展存储过程?
- email - SendGrid 发送邮件后返回成功信息,但邮件未发送
- java - 在 html 标签上使用正则表达式,但保留分隔符
- google-apps-script - Google App Script 从命名范围获取(MID)字符串
- python - 最后的幸存者 - 代码战