首页 > 解决方案 > 如何使用 Cloud Composer 将大数据从 Postgres 导出到 S3?

问题描述

我一直在使用 Postgres to S3 运算符将数据从 Postgres 加载到 S3。但最近,我不得不导出一个非常大的表,我的 Airflow 作曲家在没有任何日志的情况下失败,这可能是因为我们正在使用 Python 的 tempfile 模块的 NamedTemporaryFile 函数来创建一个临时文件,并且我们正在使用这个临时文件加载到 S3 . 由于我们使用的是 Composer,这将被加载到 Composer 的本地内存中,并且由于文件的大小非常大,所以它会失败。

请参阅此处:https ://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_emitting_logs

我确实检查了 RedshiftToS3 运算符,因为它也使用 Postgres 钩子,它有几个卸载选项可以轻松加载大文件,但我意识到 Redshift 和 Postgres 之间没有 1-1 对应关系。所以这是不可能的。有什么办法可以拆分我的 Postgres 查询?现在我正在做SELECT * FROM TABLENAME另外,我没有关于桌子的任何信息。

我也遇到过这个类似的运算符:https ://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/sql_to_gcs.html

这里有一个参数approx_max_file_size_bytes

此运算符支持将大型表转储拆分为多个文件的能力(请参阅上面文件名参数文档中的注释)。此参数允许开发人员指定拆分的文件大小。

我从代码中了解到的是,当大小超过给定限制时,他们正在创建一个新的临时文件,所以他们是否将文件拆分为多个临时文件,然后分别上传?

编辑:我将再次准确解释我想要做什么。目前,Postgres to S3 运算符创建一个临时文件并将光标返回的所有结果写入该文件,这会导致内存问题。所以我在想的是,我可以添加一个 max_file_size 限制,对于游标中的每一行,我会将结果写入我们的临时文件,如果我们的临时文件的大小超过我们设置的 max_file_size 限制,我们将写入我们的内容文件到 S3,然后刷新或删除此文件,然后创建一个新的临时文件并将下一行光标写入此文件并将该文件也上传到 S3。我不确定如何像那样修改运算符?

标签: pythonpostgresqlairflowgoogle-cloud-composer

解决方案


正如您已经发现的那样,这是因为您正在为表中的每一行构建一个字典,当您的表中有很多行时,您的机器上的内存就会不足。

您已经真正回答了自己的问题:只写 a 直到文件达到一定大小,然后将文件推送到 S3。或者,您可以只将文件保留在磁盘上并每 x 行刷新一次字典对象,但在这种情况下,您的文件可能会在磁盘上变得非常大,而不是在内存中。


推荐阅读