snowflake-cloud-data-platform - 流+任务缺少插入?
问题描述
我们在一张桌子上设置了一个流,它通过雪管连续加载。
我们将这些数据用于每分钟运行一次的任务,我们合并到另一个表中。存在重复键的可能性,因此我们使用 ROW_NUMBER() 窗口函数,按文件创建的时间戳降序排列,其中 row_num=1。这样我们总能得到最新的插入
最初,我们使用带有合并语句的标准任务,但我们注意到在某些情况下,由于雪管不保证按文件暂存时的顺序加载,因此我们使用旧数据更新行。因此,在 WHEN MATCHED 部分,我们添加了一个条件,因此只有当文件创建 ts > existing 时,才能更新行
但是,既然我们这样做了,对帐检查显示缺少一些新插入。我不确定为什么更改匹配的子句会干扰不匹配的子句。
我的理论是,额外的子句为任务运行增加了一些时间,其中一些运行被跳过,或者下一次运行几乎在最后一次完成后立即发生。这个想法是丢失的行被夹在中间并且偏移量在它们被消耗之前发生了变化
因此,我们将任务更改为调用使用显式事务的存储过程。我们这样做是因为文档似乎建议使用事务将锁定流。然而,即使这样,我们也可以看到新的插入仍然丢失。我们说的是非常小的数字,例如十万分之八
任何想法可能会发生什么?
下面的示例任务代码(不是 sp 版本)
WAREHOUSE = TASK_WH
SCHEDULE = '1 minute'
WHEN SYSTEM$stream_has_data('my_stream')
AS
MERGE INTO processed_data pd USING (
select
ms.*,
CASE WHEN ms.status IS NULL THEN 1/mv.count ELSE NULL END as pending_count,
CASE WHEN ms.status='COMPLETE' THEN 1/mv.count ELSE NULL END as completed_count
from my_stream ms
JOIN my_view mv ON mv.id = ms.id
qualify
row_number() over (
partition by
id
order by
file_created DESC
) = 1
) ms ON ms.id = pd.id
WHEN NOT MATCHED THEN INSERT (col1, col2, col3,... )
VALUES (ms.col1, ms.col2, ms.col3,...)
WHEN MATCHED AND ms.file_created >= pd.file_created THEN UPDATE SET pd.col1 = ms.col1, pd.col2 = ms.col2, pd.col3 = ms.col3, ....
;
解决方案
该条件"AND ms.file_created >= pd.file_created"
似乎是作为一种机制添加的,以避免多次更新同一行。
可以使用替代方法IS DISTINCT FROM
将源列与目标列进行比较(id 除外):
MERGE INTO processed_data pd USING (
select
ms.*,
CASE WHEN ms.status IS NULL THEN 1/mv.count ELSE NULL END as pending_count,
CASE WHEN ms.status='COMPLETE' THEN 1/mv.count ELSE NULL END as completed_count
from my_stream ms
JOIN my_view mv ON mv.id = ms.id
qualify
row_number() over (
partition by
id
order by
file_created DESC
) = 1
) ms ON ms.id = pd.id
WHEN NOT MATCHED THEN INSERT (col1, col2, col3,... )
VALUES (ms.col1, ms.col2, ms.col3,...)
WHEN MATCHED
AND (pd.col1, pd.col2,..., pd.coln) IS DISTINCT FROM (ms.col1, ms.col2,..., ms.coln)
THEN UPDATE SET pd.col1 = ms.col1, pd.col2 = ms.col2, pd.col3 = ms.col3, ....;
当没有任何变化时,这种方法还将阻止更新行。
推荐阅读
- angular - 我可以选择从明天开始到该日期起 3 个月的 ion2-calendar 范围吗?
- docker - 如何使用 no_proxy 设置设置 docker
- facebook - 在移动登录时:无法加载 URL:此 URL 的域不包含在应用程序的域中
- r - rstudio 中的 pheatmap 错误“维数不正确”
- c# - 如何从专有信息中获取日期
- python - 如何使用与 Excel 的 SUMIFS、COUNTIFS、AVERAGEIFS 函数等效的 Pandas 创建新的 Dataframe 列?
- regex - 使用 Ansible 替换 YAML 文件中的特定字符串
- r - R未来的多会话限制CPU数量
- django - 决定在 Django Rest Framework 上检索数据的模型
- python - Python 多处理失控内存