python - 数据摄取:将动态文件从 S3 加载到 Snowflake
问题描述
情况:每个月都有一个 csv 进入 AWS S3。供应商可以随意添加/删除/修改文件中的列。所以架构是不提前知道的。要求是在 Snowflake 中动态创建一个表并将数据加载到所述表中。Matillion 是我们的 ELT 工具。
这是我到目前为止所做的。
- 设置一个 Lambda 来检测文件的到达,将其转换为 JSON,上传到另一个 S3 目录并将文件名添加到 SQS。
- Matillion 检测 SQS 消息并将带有 JSON 数据的文件加载到 SF 表的 Variant 列中。
- SF Stored proc 采用变量列并根据 JSON 数据中的字段数生成一个表。SF 中的 VARIANT 列仅在其 JSON 数据时以这种方式工作。遗憾的是不支持 CSV。
这适用于 10,000 行。当我使用超过 1GB(超过 10M 行)的完整文件运行它时,就会出现问题。它在运行时因磁盘空间不足错误而使 lambda 作业崩溃。
这些是我到目前为止想到的替代方案:
- 将 EFS 卷附加到 lambda,并在上传到 S3 之前使用它来存储 JSON 文件。JSON 数据文件比它们的 CSV 对应文件大得多,我预计 json 文件大约为 10-20GB,因为该文件有超过 10M 行。
- Matillion 有一个 Excel 查询组件,它可以在其中获取标题并动态创建表格并加载文件。我在想我可以将 CSV 中的标题行转换为 Lambda 中的 XLX 文件,将其传递给 Matillion,让它创建结构,然后在创建结构后加载 csv 文件。
我在这里还有哪些其他选择?考虑因素包括用于未来大型 CSV 或类似需求的良好可重复设计模式、EFS 成本、我是否充分利用了可用的工具?谢谢!!!
解决方案
为什么要将 CSV 转换为 JSON;CSV 直接加载到表中,无需进行 JSON 特殊需要的任何数据转换,横向展平以将 json 转换为关系数据行;以及为什么不使用 Snowflake Snowpipe 功能将数据直接加载到 Snowflake 中而不使用 Matallion。您可以在加载到 Snowflake 之前将大型 csv 文件拆分为较小的块;这将有助于跨 SF Warehouse 分配数据处理负载。
推荐阅读
- python - Python API Post HMAC - 有效负载不匹配签名请求
- android - 如何在 SQLite 中显示从数据库到活动的查询?
- html - 设置 div 的 %width 会取消 div 的 float 属性
- c# - C# Power Point Add In VSTO 读取视图事件
- react-hooks - 当搜索栏通过钩子激活时如何显示透明背景?
- flutter - 简单的颤动对齐问题 - 网格上按钮内的文本
- javascript - 使用 javascript 判断当前浏览器是否登录了 microsoft 帐户
- javascript - 如何在react js中使用material-ui和react router重叠组件?
- cloud-foundry - 如何在 bootstrap.yml 中引用 Hashicorp Vault Service Broker 注入的环境变量
- python-3.x - 如何测试模块初始化