首页 > 解决方案 > 流+任务缺少插入?

问题描述

我们在一张桌子上设置了一个流,它通过雪管连续加载。

我们将这些数据用于每分钟运行一次的任务,我们合并到另一个表中。存在重复键的可能性,因此我们使用 ROW_NUMBER() 窗口函数,按文件创建的时间戳降序排列,其中 row_num=1。这样我们总能得到最新的插入

最初,我们使用带有合并语句的标准任务,但我们注意到在某些情况下,由于雪管不保证按文件暂存时的顺序加载,因此我们使用旧数据更新行。因此,在 WHEN MATCHED 部分,我们添加了一个条件,因此只有当文件创建 ts > existing 时,才能更新行

但是,既然我们这样做了,对帐检查显示缺少一些新插入。我不确定为什么更改匹配的子句会干扰不匹配的子句。

我的理论是,额外的子句为任务运行增加了一些时间,其中一些运行被跳过,或者下一次运行几乎在最后一次完成后立即发生。这个想法是丢失的行被夹在中间并且偏移量在它们被消耗之前发生了变化

因此,我们将任务更改为调用使用显式事务的存储过程。我们这样做是因为文档似乎建议使用事务将锁定流。然而,即使这样,我们也可以看到新的插入仍然丢失。我们说的是非常小的数字,例如十万分之八

任何想法可能会发生什么?

下面的示例任务代码(不是 sp 版本)

WAREHOUSE = TASK_WH
SCHEDULE = '1 minute'
WHEN SYSTEM$stream_has_data('my_stream')
AS
MERGE INTO processed_data pd USING (
  select 
  ms.*, 
  CASE WHEN ms.status IS NULL THEN 1/mv.count ELSE NULL END as pending_count, 
  CASE WHEN ms.status='COMPLETE' THEN 1/mv.count ELSE NULL END as completed_count
  from my_stream ms
  JOIN my_view mv ON mv.id = ms.id
  qualify
    row_number() over (
      partition by 
        id
      order by
        file_created DESC
    ) = 1
) ms ON ms.id = pd.id
    WHEN NOT MATCHED THEN INSERT (col1, col2, col3,...  )
                          VALUES (ms.col1, ms.col2, ms.col3,...)    
WHEN MATCHED AND ms.file_created >= pd.file_created THEN UPDATE SET pd.col1 = ms.col1, pd.col2 = ms.col2, pd.col3 = ms.col3, ....
;

标签: snowflake-cloud-data-platform

解决方案


该条件"AND ms.file_created >= pd.file_created"似乎是作为一种机制添加的,以避免多次更新同一行。

可以使用替代方法IS DISTINCT FROM将源列与目标列进行比较(id 除外):

MERGE INTO processed_data pd USING (
  select 
  ms.*, 
  CASE WHEN ms.status IS NULL THEN 1/mv.count ELSE NULL END as pending_count, 
  CASE WHEN ms.status='COMPLETE' THEN 1/mv.count ELSE NULL END as completed_count
  from my_stream ms
  JOIN my_view mv ON mv.id = ms.id
  qualify
    row_number() over (
      partition by 
        id
      order by
        file_created DESC
    ) = 1
) ms ON ms.id = pd.id
    WHEN NOT MATCHED THEN INSERT (col1, col2, col3,...  )
                          VALUES (ms.col1, ms.col2, ms.col3,...)    
WHEN MATCHED 
 AND (pd.col1, pd.col2,..., pd.coln) IS DISTINCT FROM (ms.col1, ms.col2,..., ms.coln)
THEN UPDATE SET pd.col1 = ms.col1, pd.col2 = ms.col2, pd.col3 = ms.col3, ....;

当没有任何变化时,这种方法还将阻止更新行。


推荐阅读