sql - Amazon Redshift 中的动态合并语句 (upsert)
问题描述
我想与您分享我为 Amazon Redshift 制作的东西,它是 SCD(缓慢变化的维度)类型 2 的动态合并语句。
该声明假设预先存在:
- 数据库中的两个模式 - dbimports和repo。dbimports 模式用作暂存区,repo 将成为 SCD 类型 2 维度和事实的目标。
- repo 和 dbimports 模式中的表具有相同的名称以及来自源数据的相同列名。
- repo模式中的表将具有预设的主键。
- repo模式中的表将具有三个附加列 - scd_key、is_active、inserted_date。scd_key 是标识列,如果未强制执行主键约束,则可以省略。
请注意,我故意没有添加end_date列,因为我发现它在我的特定项目的情况下不可用。
该语句的作用:
- 使用一个参数 (table_name varchar(256))在repo模式中创建一个存储过程。
- 从 table_name 参数动态获取要合并到的表名,并在 repo 模式中 dbimports 表与其镜像表之间的匹配行上对 is_active 标志列(设置为 0)执行 UPDATE 语句。
- 动态定义 UPDATE 语句的连接条件。
- 使用 is_active 标志设置为 1 和当前日期 (GETDATE()) 从 dbimports 到 repo 模式进行 INSERT * INTO。
我知道如果您在普通 SQL 环境中构建它,这不是一项艰巨的任务,但您可能知道,Redshift 的 SQL 是“高度修改的”。这意味着大多数普通 SQL 的功能都像变量和触发器一样被删除,这使得这项任务很难弄清楚(至少对我来说是这样)。
希望这对任何人都有帮助。祝你有个好的一天。
解决方案
CREATE OR REPLACE PROCEDURE repo.inserter(table_name IN varchar(256))
AS $$
DECLARE
temp_table_name varchar := table_name || '_temp';
one_liner varchar := temp_table_name || '_one_liner';
query varchar;
query_builder varchar;
rec record;
BEGIN
EXECUTE 'DROP TABLE IF EXISTS ' || temp_table_name;
EXECUTE 'DROP TABLE IF EXISTS ' || one_liner;
EXECUTE 'CREATE TEMP TABLE ' || temp_table_name || ' (column_name varchar(256),ordinal_position int,is_nullable varchar(3));';
EXECUTE 'INSERT INTO ' || temp_table_name || '(column_name,ordinal_position,is_nullable) SELECT column_name,ordinal_position,is_nullable FROM svv_columns WHERE table_schema = ''repo'' AND table_name = ''' || table_name || ''';';
EXECUTE 'CREATE TEMP TABLE ' || one_liner || ' (columns_string varchar(1024));';
EXECUTE 'INSERT INTO ' || one_liner || ' SELECT LISTAGG(column_name,'','') WITHIN GROUP (order by ordinal_position) FROM ' || temp_table_name || ' where column_name not in ( ''scd_key'',''inserted_date'',''is_active'');';
query := 'SELECT column_name FROM ' || temp_table_name || ' WHERE is_nullable = ''NO'' and column_name not in(''scd_key'',''inserted_date'') order by ordinal_position;';
query_builder := 'UPDATE repo.' || table_name || ' SET is_active = 0 FROM dbimports.' || table_name || ' b WHERE ';
FOR rec IN EXECUTE query LOOP
query_builder := query_builder || 'repo.' || table_name || '.' || rec.column_name || ' = b.' || rec.column_name || ' AND ';
END LOOP;
query_builder := RTRIM(query_builder,'AND ') || ';';
EXECUTE query_builder;
query := 'SELECT columns_string FROM ' || one_liner || ';';
FOR rec IN EXECUTE query LOOP
query_builder := 'INSERT INTO repo.' || table_name || '(' || rec.columns_string || ',is_active,inserted_date) (SELECT ' || rec.columns_string || ',1 as is_active,GETDATE() as inserted_date FROM dbimports.' || table_name || ');';
END LOOP;
EXECUTE query_builder;
END;
$$ LANGUAGE plpgsql;
推荐阅读
- java - 在 JPA 中实现互斥关系的最佳方法是什么?
- database - Teradata CLIv2 与 JDBC 和 ODBC 的优势?
- javascript - 如何在 API 请求中从 HTML / UI 向服务器发送标头?
- c# - Unity - 对象引用在编辑器中设置但未在游戏中设置
- java - AlertDialogs 一个接一个地显示/关闭
- python - 在AES解密中将三个状态组合成一轮
- c# - Windows 错误报告 LocalDumps 在尝试创建注册表时返回 null
- multithreading - 当安排太多工作时,NiFi 如何处理节流或排队?
- apache - 从 https:// 重定向到 https://www
- php - How to get any first heading tags in post and display the result?