首页 > 解决方案 > 基于事件在将文件放入 S3 存储桶时触发和运行气流任务

问题描述

是否可以仅在特定事件发生时运行气流任务,例如将文件放入特定 S3 存储桶的事件。类似于 AWS Lambda 事件的东西

有,S3KeySensor但我不知道它是否符合我的要求(仅在事件发生时运行任务)

这是使问题更清楚的示例:

我有一个传感器对象如下

sensor = S3KeySensor(
    task_id='run_on_every_file_drop',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='my-sensor-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag
)

使用上述传感器对象,传感器任务的气流行为如下:

我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,或者不能像基于事件的管道(类似于 AWS Lambda)那样设置

标签: amazon-s3airflow-schedulerairflow

解决方案


气流基本上是围绕基于时间的调度组织的。

您可以通过以下几种方式破解以获得您想要的东西:

  1. 假设您在 S3 上有一个 SQS 事件,它会触发一个调用气流 API 以触发 dag 运行的 AWS Lambda。
  2. 您可以使用 SQS 传感器启动 DAG,当它收到 s3 更改事件时,它只会继续 DAG 的其余部分(有关重新调度,请参见 3_1 和 3_2)。
  3. 您可以使用传感器(如您展示的那个)启动 DAG,它不选择要运行的任务,它只是传递给下一个相关任务或超时。您必须删除使传感器匹配的密钥。
    1. 您通过使最终任务重新触发 DAG 来重新运行。
    2. 或者将计划间隔设置为每分钟,没有追赶,最大活动 DAG 运行设置为 1。这样一次运行将处于活动状态,传感器将保持它直到超时。如果完成或超时,下一次运行将在一分钟内开始。

如果您使用路线 3,您将在 DAG 及其传感器的下一次运行之前删除通过传感器的键。请注意,由于 S3 最终一致性,路由 1 和 2 更可靠。


推荐阅读