首页 > 解决方案 > Airflow - 在本地写入文件的任务 (GCS)

问题描述

在过去几年使用 AWS DataPipeline 之后,我正在 Airflow 中构建一些管道。我有几个问题我很模糊,希望得到一些澄清。对于上下文,我使用的是 Google Cloud Composer。

在 DataPipeline 中,我经常会创建带有一些类似这样的任务的 DAG:

  1. 获取数据
  2. 转换数据
  3. 在某处写入数据

在此过程中的每一步,我都可以定义一个inputNode和/或一个outputNode. 这些输出节点将在本地挂载到任务运行程序,并且一旦任务完成,本地写入的任何文件都将上传到定义为outputNode.

现在,在 Airflow 中,我认为没有相同的概念,对吧?

问:如果我在气流任务中本地编写文件,它们会去哪里?我假设它们只是驻留在任务运行器上,假设它在任务完成后不会自行破坏?

似乎在 AWS DP 中我可以挂载一个outputNode,执行以下操作:

f = open("hello.txt", "a")
f.write("world")
f.close()

任务完成后,文件hello.txt将上传到 s3 存储桶。但是在 Airflow 中,如果我做同样的事情,文件只会放在运行任务的运行器上吗?

问:我应该考虑以不同的方式编写任务吗?好像如果我的文件需要去某个地方,我必须在任务中明确地做到这一点。跟进:如果是这种情况,我是否应该在将本地创建的文件上传到存储后删除它们,或者监控这些文件在我的跑步者身上占用的空间量?

对于从 AWS DP 迁移到 Airflow 的人的任何推荐阅读,您发现有用的材料将不胜感激。

谢谢!

编辑

当我继续研究时,根据这个文档,GCS 和 Composer 似乎做了类似的事情。您的作曲家环境中的 /data 目录似乎安装在集群中的所有节点上/home/airflow/gcs/data

测试我能够确认是这种情况。

标签: airflowamazon-data-pipelinegoogle-cloud-composeraws-data-pipeline

解决方案


考虑将任务之间的数据写入数据湖 (GCS),以便这些任务可以在未来某个时间重新运行……如果您想更改算法并重新运行一年历史数据的最后一步数据。


推荐阅读