amazon-s3 - 基于事件在将文件放入 S3 存储桶时触发和运行气流任务
问题描述
是否可以仅在特定事件发生时运行气流任务,例如将文件放入特定 S3 存储桶的事件。类似于 AWS Lambda 事件的东西
有,S3KeySensor
但我不知道它是否符合我的要求(仅在事件发生时运行任务)
这是使问题更清楚的示例:
我有一个传感器对象如下
sensor = S3KeySensor(
task_id='run_on_every_file_drop',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='my-sensor-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag
)
使用上述传感器对象,传感器任务的气流行为如下:
my-sensor-bucket
即使在气流管理 UI 中切换 DAG 之前,如果 S3 存储桶中已经存在与通配符匹配的对象名称,则ON
运行任务(由于存在过去的 s3 对象,我不想运行任务)- 运行一次后,只要有新的 S3 文件对象丢弃,传感器任务就不会再次运行(我想在桶中每次有新的 S3 文件对象丢弃时运行 DAG 中的传感器任务和后续任务
my-sensor-bucket
) - 如果您配置调度程序,任务将基于调度而不是基于事件运行。所以在这种情况下,调度程序似乎不是一个选项
我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,或者不能像基于事件的管道(类似于 AWS Lambda)那样设置
解决方案
气流基本上是围绕基于时间的调度组织的。
您可以通过以下几种方式破解以获得您想要的东西:
- 假设您在 S3 上有一个 SQS 事件,它会触发一个调用气流 API 以触发 dag 运行的 AWS Lambda。
- 您可以使用 SQS 传感器启动 DAG,当它收到 s3 更改事件时,它只会继续 DAG 的其余部分(有关重新调度,请参见 3_1 和 3_2)。
- 您可以使用传感器(如您展示的那个)启动 DAG,它不选择要运行的任务,它只是传递给下一个相关任务或超时。您必须删除使传感器匹配的密钥。
- 您通过使最终任务重新触发 DAG 来重新运行。
- 或者将计划间隔设置为每分钟,没有追赶,最大活动 DAG 运行设置为 1。这样一次运行将处于活动状态,传感器将保持它直到超时。如果完成或超时,下一次运行将在一分钟内开始。
如果您使用路线 3,您将在 DAG 及其传感器的下一次运行之前删除通过传感器的键。请注意,由于 S3 最终一致性,路由 1 和 2 更可靠。
推荐阅读
- crystal-reports - 字符串和日期时间错误消息 - 在同一公式字段上使用时
- reactjs - 具有多个版本的 graphql 的 gatsby - 纱线分辨率
- c# - Unity Webgl 游戏从未完成加载。浏览器选项卡冻结
- zend-framework - 如何在网格视图中显示表单
- r - R CMD build 与 devtools::build() (在顶层找到的非标准文件/目录)
- c# - 如何获取具有键名“$oid”的参数的所有子json的父键名?
- javascript - 在每次脚本执行时在文件中发送相同的 json
- python - 如何保存有状态的张量流 keras 模型的状态?
- node.js - 在 travis yml 配置中从 json 文件设置全局环境变量?
- jmeter - 简单的数据驱动负载测试