airflow - 气流 - 如果传感器发生故障,则跳过 DAG
问题描述
我有一个过程,我每周都在等待一个文件,但是这个文件在其名称中带有时间戳,以表示 mesurment 的日期。所以我知道我这周会有一些东西,名称可以是 2020-05-25*.csv 到 2020-05-31*.csv。
我发现使用气流启动流程的唯一方法是在开始@daily 时运行传感器,并使用执行日期来查找是否有文件。问题是,由于我不知道文件将在哪一天上传,我将有 6 个失败的传感器,因此 6 个失败的 DAG,1 个成功。
SFTP 传感器部分示例:
with DAG(
"geometrie-sftp-to-safe",
default_args=default_args,
schedule_interval="@daily",
catchup=True,
) as dag:
starting_sensor = DummyOperator(
task_id="starting_sensor"
)
sensor_sftp_A = SFTPSensor(
task_id="sensor_sftp_A",
path="/input/geometrie/prod/Track_Geometry-{{ ds_nodash }}_A.csv",
sftp_conn_id="ssh_ftp_landing",
poke_interval=60,
soft_fail=True,
mode="reschedule"
)
第二个使用 GCSSensor
with DAG(
"geometrie-preprocessing",
default_args=default_args,
schedule_interval="@daily",
catchup=True
) as dag:
# File A
sensor_gcs_A = GoogleCloudStorageObjectSensor(
task_id="gcs-sensor_A",
bucket="lisea-mesea-sea-cloud-safe",
object="geometrie/original/track_geometry_{{ ds_nodash }}_A.csv",
google_cloud_conn_id="gcp_conn",
poke_interval=50
)
这就是为什么我希望将 DAG 设置为跳过的原因,当且仅当传感器发生故障时。如果是别的,我想要一个真正的失败。
解决方案
Airflow 有多个传感器,可以感应目录以检查定义的文件。schedule_interval as None 将适用于您的用例,因为您希望 DAG 仅在收到文件时触发(考虑到文件可以在一周内的任何时间收到)。
下面的 GCSSensor 示例将感知特定类型文件的存储桶并打印文件名。我很确定 SFTP 传感器应该以相同的方式工作。
dag = DAG(
dag_id='sensing-bucket',
schedule_interval=None,
default_args=args)
def new_file_detection(**context):
value = context['ti'].xcom_pull(task_ids='list_Files')
print('value is : '+str(value))
File_sensor = GoogleCloudStoragePrefixSensor(
task_id='gcs_polling',
bucket='lisea-mesea-sea-cloud-safe',
prefix='geometrie/original/track_geometry_',
dag=dag
)
GCS_File_list = GoogleCloudStorageListOperator(
task_id='list_Files',
bucket='lisea-mesea-sea-cloud-safe',
prefix='geometrie/original/track_geometry_',
delimiter='.csv',
google_cloud_storage_conn_id='google_cloud_default',
dag=dag
)
File_detection = PythonOperator(
task_id='print_detected_filename',
provide_context=True,
python_callable=new_file_detection,
dag=dag
)
File_sensor >> GCS_File_list >> File_detection
推荐阅读
- r - 使用 dplyr 对多个帐户进行分组
- javascript - 具有初始值的 Typescript Map 类构造函数不接受 2 种不同类型
- javascript - 在 100% 客户端网站 (JavaScript) 上隐藏令牌
- python - 我的递归函数似乎是使用原始列表而不是在 python 中创建一个新列表
- javascript - React Native - 多选将类组件转换为功能组件
- embedded - 更改函数调用的位置会中断 uart 上的通信
- php - 雄辩的 whereDate 子句使用日期值作为列
- powershell - 删除重复的数据行
- bootstrap-4 - Bootstrap Carousel循环通过活动存储图片Rails 6
- asp.net - 路由配置 - MVC