首页 > 解决方案 > 如何将增量json格式数据从一个表转换为另一个表格格式的表

问题描述

在我的 Airflow ETL 中,我将镶木地板文件从 AWS-S3 加载到 Snowflake 的表(raw_data)中,作为 varient 类型的单列。现在我想将表 raw_data 中的 json 格式值转换为列格式到另一个表中。每天,当 ETL 运行时,它会将新的 parquet 文件从 S3 加载到 Snowflake 的 raw_data 表中。我如何从这个 raw_data 运行转换,以便它只需要增量数据。由于 raw_data 只有一列类型为 varient。我无法弄清楚仅考虑新行的任何逻辑。

提前致谢

标签: snowflake-cloud-data-platform

解决方案


这是一个非常常见的用例。

最好的情况是您的 JSON 中有一个可靠的时间戳列。在这种情况下,您不必修改现有表。

如果你没有这样的字段,你可以在你的表中添加一个时间戳列raw_data

alter table raw_data add column added_at timestamp default current_timestamp::timestamp;

这个时间戳列也应该在您的目标表中得到解决。

alter table target_data add column last_sync_at timestamp;

真正的增量应该与您的数据转换一起使用。

with transformed as (

  select
    json_column:top_level_json_key.lower_level_key::data_type as table_column_name,
    ...
    -- do this if you have a reliable incremental timestamp key
    json_column:record_generation_key::timestamp as last_sync_at
    -- or do this if you don't have this key
    added_at as last_sync_at

  from raw_data

  where
    last_sync_at > (select max(last_sync_at) from target_data)

)

select *
from transformed

然后,您只需将整个结果包装在合并/插入中。

这当然只是一个思考的方向。真正的实现完全取决于您的用例。

干杯。


推荐阅读