首页 > 解决方案 > 数据摄取:将动态文件从 S3 加载到 Snowflake

问题描述

情况:每个月都有一个 csv 进入 AWS S3。供应商可以随意添加/删除/修改文件中的列。所以架构是不提前知道的。要求是在 Snowflake 中动态创建一个表并将数据加载到所述表中。Matillion 是我们的 ELT 工具。

这是我到目前为止所做的。

  1. 设置一个 Lambda 来检测文件的到达,将其转换为 JSON,上传到另一个 S3 目录并将文件名添加到 SQS。
  2. Matillion 检测 SQS 消息并将带有 JSON 数据的文件加载到 SF 表的 Variant 列中。
  3. SF Stored proc 采用变量列并根据 JSON 数据中的字段数生成一个表。SF 中的 VARIANT 列仅在其 JSON 数据时以这种方式工作。遗憾的是不支持 CSV。

这适用于 10,000 行。当我使用超过 1GB(超过 10M 行)的完整文件运行它时,就会出现问题。它在运行时因磁盘空间不足错误而使 lambda 作业崩溃。

这些是我到目前为止想到的替代方案:

  1. 将 EFS 卷附加到 lambda,并在上传到 S3 之前使用它来存储 JSON 文件。JSON 数据文件比它们的 CSV 对应文件大得多,我预计 json 文件大约为 10-20GB,因为该文件有超过 10M 行。
  2. Matillion 有一个 Excel 查询组件,它可以在其中获取标题并动态创建表格并加载文件。我在想我可以将 CSV 中的标题行转换为 Lambda 中的 XLX 文件,将其传递给 Matillion,让它创建结构,然后在创建结构后加载 csv 文件。

我在这里还有哪些其他选择?考虑因素包括用于未来大型 CSV 或类似需求的良好可重复设计模式、EFS 成本、我是否充分利用了可用的工具?谢谢!!!

标签: pythonamazon-web-serviceslambdasnowflake-cloud-data-platformmatillion

解决方案


为什么要将 CSV 转换为 JSON;CSV 直接加载到表中,无需进行 JSON 特殊需要的任何数据转换,横向展平以将 json 转换为关系数据行;以及为什么不使用 Snowflake Snowpipe 功能将数据直接加载到 Snowflake 中而不使用 Matallion。您可以在加载到 Snowflake 之前将大型 csv 文件拆分为较小的块;这将有助于跨 SF Warehouse 分配数据处理负载。


推荐阅读