首页 > 技术文章 > sqlserver cdc实现数据增量抽取

zzchao 原文

--创建测试库
create database test;
--创建配置表
create table test..time_config(tb varchar(20) PRIMARY KEY,enddate binary(10));
--创建业务表
create table test..TB_s (ID INT PRIMARY KEY,NAME VARCHAR(20)); --原表
create table test..TB_t (ID INT PRIMARY KEY,NAME VARCHAR(20),ISDELETED INT); --目标表

--给配置表初始时间
insert into test..time_config
select 'TB_t' as tb,max(start_lsn) as enddate from test.[cdc].[lsn_time_mapping]

--开启cdc
use TEST
GO
EXEC sys.sp_cdc_enable_db --开启库级别cdc
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'TB_s', @role_name = null; --开启表cdc
GO


--增量实现存储过程

create proc cdc_tb as
declare 
@time_begin binary(10),
@time_end binary(10)

select @time_begin=max(enddate) from test.dbo.time_config    --上次结束时间,即本次开始时间
select @time_end=max(start_lsn) from test.[cdc].[lsn_time_mapping]    --获取最大时间,即本次结束时间

--抽取增量数据
select ID,NAME,CASE WHEN __$operation=1 then 1 else 0 end as isdeleted into #tb_import from(
select row_number()over(partition by id order by [__$start_lsn] desc,__$seqval,__$operation desc ) as rn,* 
from test.[cdc].[dbo_TB_s_CT] 
where [__$start_lsn]>@time_begin and [__$start_lsn]<=@time_end) t1 where rn=1;

delete from test..TB_t where exists (select * from #tb_import);
insert into test..TB_t select * from #tb_import;

update test.dbo.time_config set enddate= @time_end where tb='TB_t';--将本次结束时间存入配置表;
declare @count int
select @count=count(1) from #tb_import 
print('更新'+cast(@count as varchar(10))+'条记录')

--测试添加:
insert into TB_s
select 1,'aaa'
--修改
update TB_s set name='bbb' where id=1
--删除
delete from TB_s where id=1

select * from TB_s

exec cdc_tb --创建job 定时作业

select * from TB_t

 

推荐阅读