python - 使用 Airflow 从 S3 进行批处理
问题描述
我从一个每天发送大约 200 万条消息的提要中收到消息。这些消息中的每一条都非常小,不需要以流方式处理它们,因此我一次批处理一整天的消息。
当前的基础设施接收这些消息,将它们组合在一起并将包含 1000 条消息的 gzip 文件上传到 AWS S3。这些文件被命名为 yyyymmdd-hhmmss 格式的日期时间戳。批处理每天运行一次(计划在 Airflow 上),它应该从存储桶中选择新文件并处理它们。目前,我没有为这项工作使用任何挂钩或传感器。
我的问题是;如果文件夹还包含所有前一天的文件,从 S3 收集新文件的最佳方法是什么?
我的低效解决方案是在 S3 上的文件夹中下拉文件列表并处理文件名与我正在处理的日期匹配的文件。我的批处理是一个 Airflow DAG,所以我想保持幂等性,这意味着我不想在处理后从这个 S3 文件夹中删除文件。
理想情况下,我只想选择文件名中日期时间在前一天午夜之后的文件(从执行日期开始)并处理它们,而不必循环浏览完整的文件列表,因为这个列表将不断增长,使每个一天比上一天稍慢。
是否通过 Airflow 或 Python 提供了更好的机制来从 S3 中选择文件?或者有没有一种更有效的方法来执行这样的任务?
解决方案
不知道您的完整基础架构,您是否可以在将 gzip 压缩文件上传到 s3 时按日期进行分区?
包含 /date=yyyymmdd/ 的 s3 文件夹前缀意味着您可以仅检索当天的文件并保持幂等性。
当涉及到您的气流工作时,您将日期作为参数传递以仅检索该 s3 分区中的那些日子。
推荐阅读
- openlayers - 如何在 OpenLayers 地图图层中获取所有先前选择的功能?
- tensorflow - 使用 TensorFlow 分析器测量 GPU 内存使用情况
- scala - Akka Streams 中的连接流
- laravel - AWS S3 拒绝 deleteDirectory() 请求,即使 delete() 有效
- css - 带有 R 包 r2d3 的 D3 脚本的示例 CSS?
- r - gganimate 0.9.9.9 中的累积图
- r - 由线条连接的条形图/如何连接在 R / ggplot2 中使用 grid.arrange 排列的两个图形
- minimatch - TFS 2018 minimatch 模式未能排除测试项目
- php - T-SQL 在创建备份文件后立即删除它
- c# - MongoDB C# 驱动程序 - 将集合序列化为接口