snowflake-cloud-data-platform - 如何将增量json格式数据从一个表转换为另一个表格格式的表
问题描述
在我的 Airflow ETL 中,我将镶木地板文件从 AWS-S3 加载到 Snowflake 的表(raw_data)中,作为 varient 类型的单列。现在我想将表 raw_data 中的 json 格式值转换为列格式到另一个表中。每天,当 ETL 运行时,它会将新的 parquet 文件从 S3 加载到 Snowflake 的 raw_data 表中。我如何从这个 raw_data 运行转换,以便它只需要增量数据。由于 raw_data 只有一列类型为 varient。我无法弄清楚仅考虑新行的任何逻辑。
提前致谢
解决方案
这是一个非常常见的用例。
最好的情况是您的 JSON 中有一个可靠的时间戳列。在这种情况下,您不必修改现有表。
如果你没有这样的字段,你可以在你的表中添加一个时间戳列raw_data
。
alter table raw_data add column added_at timestamp default current_timestamp::timestamp;
这个时间戳列也应该在您的目标表中得到解决。
alter table target_data add column last_sync_at timestamp;
真正的增量应该与您的数据转换一起使用。
with transformed as (
select
json_column:top_level_json_key.lower_level_key::data_type as table_column_name,
...
-- do this if you have a reliable incremental timestamp key
json_column:record_generation_key::timestamp as last_sync_at
-- or do this if you don't have this key
added_at as last_sync_at
from raw_data
where
last_sync_at > (select max(last_sync_at) from target_data)
)
select *
from transformed
然后,您只需将整个结果包装在合并/插入中。
这当然只是一个思考的方向。真正的实现完全取决于您的用例。
干杯。
推荐阅读
- php - Laravel 7 在 chrome 上首次登录后重定向问题
- c++ - 从编译时向量中删除相邻重复项的元程序
- apache-kafka - 卡夫卡消费者协调
- docker - 生成了不正确的 docker 清单
- performance - 性能工具(灯塔)在我的 Gatsby 网站上显示非常缓慢的“交互时间”。我该如何改进它?
- ibm-cloud - Traefikv2.3 出现“无法列出 *v1beta1.IngressClass: ingressclasses.networking.k8s.io”错误
- laravel - Laravel Blade 指令添加额外的引号
- mysql - 加载本地 infile 查询运行成功,但数据未上传到服务器表 Mysql 8.0
- excel - 单元格编辑期间 Application.OnTime 调用的宏的“正常”Excel行为是什么?
- html - 使用 div 在 CSS 元素周围创建空白区域