首页 > 解决方案 > Airflow S3KeySensor - 如何让它继续运行

问题描述

在这篇 Stackoverflow 帖子的帮助下,我刚刚制作了一个程序(帖子中显示的那个),当一个文件放在 S3 存储桶中时,我正在运行的 DAG 中的一个任务被触发,然后我使用 BashOperator 执行一些工作。一旦完成,尽管 DAG 不再处于运行状态,而是进入成功状态,如果我想让它拾取另一个文件,我需要清除所有“过去”、“未来”、“上游”、“下游的活动。我想制作这个程序,使其始终运行,并且只要将新文件放入 S3 存储桶中,程序就会启动任务。

我可以继续使用 S3KeySenor 来执行此操作,还是需要想办法设置外部触发器来运行我的 DAG?到目前为止,如果我的 S3KeySensor 只运行一次,它就毫无意义。

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 29),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')

# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
    task_id='create_emr_cluster_1',
    bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
    retries=1,
    dag=dag)

t1 = BashOperator(
    task_id='success_log',
    bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='new_s3_file_in_foobar-bucket',
    bucket_key='*',
    wildcard_match=True,
    bucket_name='foobar-bucket',
    s3_conn_id='s3://foobar-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)
t2.set_upstream(t1)

我想知道这是否不可能,因为它不会是有向无环图,而是会有一个重复传感器 -> t1 -> t2 -> 传感器 -> t1 -> t2 -> 传感器 ->的循环...不断重复

更新:

我的用例非常简单,只要将新文件放在指定的 AWS S3 存储桶中,我就希望触发我的 DAG 并开始执行各种任务。这些任务将执行诸如实例化一个新的 AWS EMR 集群、从 AWS S3 存储桶中提取文件、执行一些 AWS EMR 活动,然后关闭 AWS EMR 集群之类的事情。从那里,DAG 将回到等待状态,等待新文件到达 AWS S3 存储桶,然后无限期地重复该过程。

标签: boto3airflowairflow-scheduler

解决方案


在 Airflow 中,没有映射到始终运行的 DAG 的概念。如果适合您的用例,您可以非常频繁地运行 DAG,例如每 1 到 5 分钟一次。

这里的主要内容是 S3KeySensor 会检查直到它检测到第一个文件存在于密钥的通配符路径中(或超时),然后它才会运行。但是当第二个、第三个或第四个文件着陆时,S3 传感器将已经完成了该 DAG 运行的运行。在下一次 DAG 运行之前,它不会被安排再次运行。(您描述的循环想法大致相当于调度程序在创建 DAG 运行时所做的事情,但不是永远。)

外部触发器听起来绝对是您的用例的最佳方法,无论该触发器来自 Airflow CLI 的trigger_dag命令 ( $ airflow trigger_dag ...):

https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222

或通过 REST API:

https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89

两者都转身并调用trigger_dag通用(实验)API中的函数:

https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbeedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67

例如,您可以设置一个 AWS Lambda 函数,当文件到达 S3 时调用该函数,该函数运行触发器 DAG 调用。


推荐阅读