首页 > 解决方案 > 气流文件传感器与特定文件模式匹配

问题描述

我正在使用气流,我有 1 个由文件传感器开始的 dag,它运行良好,但我需要一个条件来匹配文件的特定模式。我看到了 OmegaFileSensor 但我无法导入它(可能已弃用?)。有人已经在使用我想要的东西了吗?

我的天:

import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator

scriptAirflow = '/home/alexw/scriptAirflow/'
uploadPath= '/apps/lv-manuf2020-data/80_DATA/00_Loading/'


default_args = {
    'owner': 'manuf2020',
    'start_date': dt.datetime(2020, 2, 17),
    'retries': 1
}

dag = DAG('lv-manuf2020', default_args=default_args, description='airflow_manuf2020',
          schedule_interval=None, catchup=False)

sensor_file = FileSensor(
    task_id="sensor_file",
    filepath=uploadPath,
    fs_conn_id='airflow_db',
    dag=dag,
)
move_file = BashOperator(
    task_id="move_file",
    bash_command='python3 '+scriptAirflow+'movingFiles.py "{{ execution_date }}"',
    trigger_rule='one_success',
    dag=dag,
)

run_Sh_Script = BashOperator(
    task_id='run_scriptSh',
    bash_command='python3 '+scriptAirflow+'runShScript.py "{{ execution_date }}"',
    dag=dag,
)

rerun_dag=TriggerDagRunOperator(
    task_id='rerun_dag',
    trigger_dag_id='lv-manuf2020',
    dag=dag,
)

move_file.set_upstream(sensor_file)
run_Sh_Script.set_upstream(sensor_file)
rerun_dag.set_upstream(sensor_file)
run_Sh_Script.set_upstream(move_file)
rerun_dag.set_upstream(move_file)
rerun_dag.set_upstream(run_Sh_Script)

非常感谢 !

标签: airflow

解决方案


由于FileSensor运算符使用glob模块来匹配文件模式,因此您可以使用文件名模式或通配符来实现与 OmegaSensor 运算符相同的功能

例如:

uploadPath= '/apps/lv-manuf2020-data/80_DATA/00_Loading/*.ini'

以上将检查ini目录中的文件。

源代码看这里:https ://github.com/apache/airflow/blob/master/airflow/sensors/filesystem.py#L61


推荐阅读