airflow - 气流文件传感器与特定文件模式匹配
问题描述
我正在使用气流,我有 1 个由文件传感器开始的 dag,它运行良好,但我需要一个条件来匹配文件的特定模式。我看到了 OmegaFileSensor 但我无法导入它(可能已弃用?)。有人已经在使用我想要的东西了吗?
我的天:
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator
scriptAirflow = '/home/alexw/scriptAirflow/'
uploadPath= '/apps/lv-manuf2020-data/80_DATA/00_Loading/'
default_args = {
'owner': 'manuf2020',
'start_date': dt.datetime(2020, 2, 17),
'retries': 1
}
dag = DAG('lv-manuf2020', default_args=default_args, description='airflow_manuf2020',
schedule_interval=None, catchup=False)
sensor_file = FileSensor(
task_id="sensor_file",
filepath=uploadPath,
fs_conn_id='airflow_db',
dag=dag,
)
move_file = BashOperator(
task_id="move_file",
bash_command='python3 '+scriptAirflow+'movingFiles.py "{{ execution_date }}"',
trigger_rule='one_success',
dag=dag,
)
run_Sh_Script = BashOperator(
task_id='run_scriptSh',
bash_command='python3 '+scriptAirflow+'runShScript.py "{{ execution_date }}"',
dag=dag,
)
rerun_dag=TriggerDagRunOperator(
task_id='rerun_dag',
trigger_dag_id='lv-manuf2020',
dag=dag,
)
move_file.set_upstream(sensor_file)
run_Sh_Script.set_upstream(sensor_file)
rerun_dag.set_upstream(sensor_file)
run_Sh_Script.set_upstream(move_file)
rerun_dag.set_upstream(move_file)
rerun_dag.set_upstream(run_Sh_Script)
非常感谢 !
解决方案
由于FileSensor
运算符使用glob
模块来匹配文件模式,因此您可以使用文件名模式或通配符来实现与 OmegaSensor 运算符相同的功能
例如:
uploadPath= '/apps/lv-manuf2020-data/80_DATA/00_Loading/*.ini'
以上将检查ini
目录中的文件。
源代码看这里:https ://github.com/apache/airflow/blob/master/airflow/sensors/filesystem.py#L61
推荐阅读
- php - 使用语言环境时未找到 404
- java - 如何在@Query 注释中使用参数的方法
- azure - Azure VM 加密:aadAppname
- mysql - SQL:从另一个数据集中添加多个列
- jmeter - 如何将值bean shell采样器发送到jmeter中的rest api
- javascript - 有没有办法获取模型元素的坐标?
- python - 如果没有命中,则合并 2 个数据框并添加 NaN
- tkinter - 使用 cx_freeze 创建的 tkinter 应用程序的 MSI 不起作用
- php - 如何从textarea获取多行数据到mysql
- sql - 来自动态列的 SQL 数据透视表