sql - 如果我们试图将数据拟合到数据保险库模型中,如何在雪花合并语句中的 WHEN MATCHED THEN 上插入一行?
问题描述
我正在将数据加载到雪花数据保险库建模数据库中。当行的字段已更新时,模型的工作方式如下:
- 将此行的加载结束日期设置为等于
current_timestamp()
。 - 将具有新值的同一行再次添加到模型中。
我merge
在 JavaScript 过程中使用来自 Snowflake 的命令来执行此操作:
var observarion_query = "MERGE INTO HUB_OBSERVATION AS OBS "+
"USING (SELECT DATE(T.$"+OBSERVATION_DATE+", 'DD/MM/YYYY') AS OBS_DATE, T.$"+LOCATIONS+", T.$"+SUBMISSION_TIME+" FROM "+FILE_FULL_PATH+"(FILE_FORMAT=>"+FILE_FORMAT_NAME+") T) ST "+
"ON md5(CONCAT(ST.OBS_DATE, CONCAT('CAMP', CONCAT(ST.$"+LOCATION_POSITION+", ST.$"+SUBMISSION_TIME+")))) = OBS.OBSERVATION_DATE_LOCATION_HASH_KEY "+
"WHEN MATCHED THEN UPDATE SET OBS.LOAD_END_DT = CURRENT_TIMESTAMP() "+
"WHEN NOT MATCHED THEN "+
"INSERT (OBSERVATION_DATE_LOCATION_HASH_KEY, LOAD_DT, LOAD_END_DT, RECORD_SRC, OBSERVATION_DATE, LOCATION_NAME) "+
"VALUES (md5(CONCAT(ST.OBS_DATE, CONCAT('CAMP', CONCAT(ST.$"+LOCATION_POSITION+", ST.$"+SUBMISSION_TIME+")))), current_timestamp(), NULL, 'ONA', ST.OBS_DATE, CONCAT('CAMP', ST.$"+LOCATION_POSITION+")) ";
问题出在 内WHEN MATCHED THEN
,我需要用它的新值对同一行进行插入,但有额外的条件说:
WHEN MATCHED and OBS.REVIEW_STATUS <> ST.REVIEW_STATUS THEN
// INSERT THE ROW
而且我真的知道我们不能在WHEN MATCHED THEN
语句中执行插入查询。
我们怎样才能找到转机呢?
解决方案
如果我正确理解您的问题,您希望单个源行可能对目标表造成 2 项操作:
- 将 LOAD_END_DT 更新为当前时间戳(如果键已存在于目标表中且 LOAD_END_DT = NULL,则表示它是“当前”)
- 使用最新信息在目标表中插入新行
您可以通过使用 UNION ALL 将 USING 子句中的每个源行“拆分”为 2 行来实现此目的:一行用于 UPDATE,一行用于 INSERT。我通常包含一个布尔标志来区分它们(因为它们是重复的)。在 INSERT 部分,我执行SELECT ... WHERE NOT EXISTS (SELECT 1 FROM target WHERE key = key and MD5() = MD5())以便仅当新行与当前行。我的 ON 子句在表示 UPDATE 场景的布尔值上有一个过滤器。
[编辑以包含示例 MERGE]
首先,假设以下阶段和最终表定义:
CREATE OR REPLACE TRANSIENT TABLE T_STAGE (
ID INTEGER
,COL1 VARCHAR
,COL2 VARCHAR
,COL3 VARCHAR
)
;
CREATE OR REPLACE TRANSIENT TABLE T_FINAL (
ID INTEGER
,START_TS TIMESTAMP_LTZ
,END_TS TIMESTAMP_LTZ
,COL1 VARCHAR
,COL2 VARCHAR
,COL3 VARCHAR
,COL_MD5_HASH VARCHAR
)
;
此 MERGE 说明了使用 UNION ALL 将单个源行拆分为 2,以便可以对目标表应用 INSERT 和 UPDATE:
MERGE INTO T_FINAL AS TGT
USING (
WITH CTE_X AS (
SELECT ID
,COL1
,COL2
,COL3
,MD5(ARRAY_TO_STRING(ARRAY_CONSTRUCT(ID, COL1, COL2, COL3), '^')) AS COL_MD5_HASH
FROM T_STAGE
)
SELECT FALSE AS UPDATE_FLAG
,X.ID
,X.COL1
,X.COL2
,X.COL3
,X.COL_MD5_HASH
FROM CTE_X X
WHERE NOT EXISTS (
SELECT 1
FROM T_FINAL T2
WHERE T2.COL_MD5_HASH = X.COL_MD5_HASH
)
UNION ALL
SELECT TRUE AS UPDATE_FLAG
,X.ID
,X.COL1
,X.COL2
,X.COL3
,X.COL_MD5_HASH
FROM CTE_X X
JOIN T_FINAL T3
ON T3.END_TS IS NULL
AND T3.ID = X.ID
AND T3.COL_MD5_HASH != X.COL_MD5_HASH
) AS SRC
ON TGT.END_TS IS NULL
AND SRC.ID = TGT.ID
AND SRC.UPDATE_FLAG
WHEN NOT MATCHED THEN INSERT (ID, START_TS, END_TS, COL1, COL2, COL3, COL_MD5_HASH)
VALUES (SRC.ID, CURRENT_TIMESTAMP(), NULL, SRC.COL1, SRC.COL2, SRC.COL3, SRC.COL_MD5_HASH)
WHEN MATCHED THEN UPDATE SET END_TS = CURRENT_TIMESTAMP()
;
根据您的规格,有许多假设和变化。例如,如果在每次 MERGE 后清除阶段表行,则可以删除 NOT EXISTS ......它只是为了避免多次插入同一阶段行。您必须进行调整以匹配您的规格。这仅用于说明目的(因为您提出了要求)。
推荐阅读
- html - 选择下拉菜单后如何更改输入值
- ios - iOS:Twilo 通话在后台或锁定屏幕几秒钟后断开连接
- javascript - NodeJS Json 文件不断回滚
- r - 在 R 中确定每天运行的进程数和开始这些项目的平均天数
- visual-studio - 从 Visual Studio 2019 的源代码构建 cmake
- python - 二项分布:如何计算Alpha,使概率被置信区间覆盖?
- reactjs - Form.Control.Feedback 不适用于文件输入
- c# - 是否有一种内置方法可以使用 LINQ 对列表列表进行相交?
- spring - 升级到 2.4.0 后,Spring 云配置客户端未从配置服务器获取/加载配置文件
- svg - 动画跳到所需结果的结尾