首页 > 解决方案 > Debezium 仅在启动时报告更改事件

问题描述

我正在与其供应商一起使用 Debezium Spring Boot 启动器:

org.springframework.boot:spring-boot-starter-parent:2.4.10
org.springframework.cloud.fn:cdc-debezium-boot-starter:1.0.3
org.springframework.cloud.fn:cdc-debezium-supplier:1.0.3

我已将 SQLServer 配置为在数据库和表上启用 CDC,以通过以下 Debezium 文档进行捕获:

-- enable CDC on database
USE my_db;
exec sys.sp_cdc_enable_db;

-- add filegroup to store CDC data
ALTER DATABASE my_db;
ADD FILEGROUP CDC_DATA; 

-- enable CDC on ACTIONS table
USE my_db;
EXEC sys.sp_cdc_enable_table
    @source_schema=N'dbo',
    @source_name=N'ACTIONS',
    @role_name = NULL,
    @filegroup_name=N'CDC_DATA',
    @supports_net_changes=0

-- check if it is enabled
EXEC sys.sp_cdc_help_change_data_capture;

它返回:

source_schema 源表 捕获实例 object_id source_object_id start_lsn end_lsn 支持_net_changes has_drop_pending 角色名称 索引名称 文件组名 创建日期 index_column_list 捕获的列列表
德博 行动 dbo_ACTIONS 843150049 725577623 0x000000300000408B003E 无效的 错误的 无效的 无效的 PK__ACTIONS__546A96891DBD0AB8 CDC_DATA 2021-09-30 03:48:03.493 [行动编号] [ACTIONID]、[TMSTAMP]、[APUSER]、[OSUSER]、[COMMENT]、[STATUS]

所以CDC似乎为这个表启用了。

我创建了一个订阅 Debezium 通量的 Spring bean:

@Data
@Slf4j
@Service
public class LogDatabaseChanges {
    private final Supplier<Flux<Message<?>>> cdcSupplier;

    @PostConstruct
    public void logChangeEvents() {
        cdcSupplier.get().subscribe(t -> {
            log.info("#### {}", new String((byte[]) t.getPayload(), StandardCharsets.UTF_8));
        });
    }
}

当我启动我的 Spring Boot 应用程序时,Debezium 会创建一个快照并检测行更改。然后将更改写入日志。

但是,当我通过插入新行或更新值来更新数据时,不会触发任何内容。

以下是应用程序日志示例:

d.c.s.SqlServerSnapshotChangeEventSource : No previous offset has been found
d.c.s.SqlServerSnapshotChangeEventSource : According to the connector configuration both schema and data will be snapshotted
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 1 - Preparing
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 2 - Determining captured tables
i.d.r.history.DatabaseHistoryMetrics     : Unable to register the MBean 'debezium.sql_server:type=connector-metrics,context=schema-history,server=foo': debezium.sql_server:type=connector-metrics,context=schema-history,server=foo
io.debezium.util.Threads                 : Requested thread factory for connector SqlServerConnector, id = foo named = change-event-source-coordinator
io.debezium.util.Threads                 : Creating thread debezium-sqlserverconnector-foo-change-event-source-coordinator
i.d.p.ChangeEventSourceCoordinator       : Unable to register the MBean 'debezium.sql_server:type=connector-metrics,context=snapshot,server=foo': debezium.sql_server:type=connector-metrics,context=snapshot,server=foo
i.d.p.ChangeEventSourceCoordinator       : Unable to register the MBean 'debezium.sql_server:type=connector-metrics,context=streaming,server=foo': debezium.sql_server:type=connector-metrics,context=streaming,server=foo
i.d.p.ChangeEventSourceCoordinator       : Metrics registered
i.d.p.ChangeEventSourceCoordinator       : Context created
d.c.s.SqlServerSnapshotChangeEventSource : No previous offset has been found
d.c.s.SqlServerSnapshotChangeEventSource : According to the connector configuration both schema and data will be snapshotted
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 1 - Preparing
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 2 - Determining captured tables
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 3 - Locking captured tables
d.c.s.SqlServerSnapshotChangeEventSource : Setting locking timeout to 10 s
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 3 - Locking captured tables
d.c.s.SqlServerSnapshotChangeEventSource : Setting locking timeout to 10 s
d.c.s.SqlServerSnapshotChangeEventSource : Executing schema locking
d.c.s.SqlServerSnapshotChangeEventSource : Locking table server_ListeriaWGS.dbo.ACTIONS
d.c.s.SqlServerSnapshotChangeEventSource : Executing schema locking
d.c.s.SqlServerSnapshotChangeEventSource : Locking table server_ListeriaWGS.dbo.ACTIONS
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 4 - Determining snapshot offset
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 5 - Reading structure of captured tables
d.c.s.SqlServerSnapshotChangeEventSource : Reading structure of schema 'server_ListeriaWGS'
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 6 - Persisting schema history
i.d.c.sqlserver.SqlServerConnectorTask   : The Kafka Connect schema name 'foo.dbo.ACTIONS.Value' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Value'
i.d.c.sqlserver.SqlServerConnectorTask   : The Kafka Connect schema name 'foo.dbo.ACTIONS.Key' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Key'
i.d.c.sqlserver.SqlServerConnectorTask   : The Kafka Connect schema name 'foo.dbo.ACTIONS.Envelope' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope'
d.c.s.SqlServerSnapshotChangeEventSource : Schema locks released.
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 7 - Snapshotting data
.d.r.RelationalSnapshotChangeEventSource :   Exporting data from table 'server_ListeriaWGS.dbo.ACTIONS'
.d.r.RelationalSnapshotChangeEventSource :   For table 'server_ListeriaWGS.dbo.ACTIONS' using select statement: 'SELECT [ACTIONS].[ACTIONID],[ACTIONS].[TMSTAMP],[ACTIONS].[APUSER],[ACTIONS].[OSUSER],[ACTIONS].[COMMENT],[ACTIONS].[STATUS] FROM [dbo].[ACTIONS]'
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 4 - Determining snapshot offset
c.b.n.d.c.a.LogDatabaseChanges           : {"databaseName":"server_ListeriaWGS"}
c.b.n.d.c.a.LogDatabaseChanges           : {"source":{"version":"1.3.0.Final","connector":"sqlserver","name":"foo","ts_ms":1632978886876,"snapshot":"true","db":"server_ListeriaWGS","schema":"dbo","table":"ACTIONS","change_lsn":null,"commit_lsn":"00000030:00002e0a:0028","event_serial_no":null},"databaseName":"server_ListeriaWGS","schemaName":"dbo","ddl":null,"tableChanges":[{"type":"CREATE","id":"\"server_ListeriaWGS\".\"dbo\".\"ACTIONS\"","table":{"defaultCharsetName":null,"primaryKeyColumnNames":["ACTIONID"],"columns":[{"name":"ACTIONID","jdbcType":4,"nativeType":null,"typeName":"int","typeExpression":"int","charsetName":null,"length":10,"scale":0,"position":1,"optional":false,"autoIncremented":false,"generated":false},{"name":"TMSTAMP","jdbcType":12,"nativeType":null,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":40,"scale":null,"position":2,"optional":true,"autoIncremented":false,"generated":false},{"name":"APUSER","jdbcType":12,"nativeType":null,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":80,"scale":null,"position":3,"optional":true,"autoIncremented":false,"generated":false},{"name":"OSUSER","jdbcType":12,"nativeType":null,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":80,"scale":null,"position":4,"optional":true,"autoIncremented":false,"generated":false},{"name":"COMMENT","jdbcType":12,"nativeType":null,"typeName":"varchar","typeExpression":"varchar","charsetName":null,"length":150,"scale":null,"position":5,"optional":true,"autoIncremented":false,"generated":false},{"name":"STATUS","jdbcType":4,"nativeType":null,"typeName":"int","typeExpression":"int","charsetName":null,"length":10,"scale":0,"position":6,"optional":true,"autoIncremented":false,"generated":false}]}}]}
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 5 - Reading structure of captured tables
d.c.s.SqlServerSnapshotChangeEventSource : Reading structure of schema 'server_ListeriaWGS'
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 6 - Persisting schema history
i.d.c.sqlserver.SqlServerConnectorTask   : The Kafka Connect schema name 'foo.dbo.ACTIONS.Value' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Value'
i.d.c.sqlserver.SqlServerConnectorTask   : The Kafka Connect schema name 'foo.dbo.ACTIONS.Key' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Key'
i.d.c.sqlserver.SqlServerConnectorTask   : The Kafka Connect schema name 'foo.dbo.ACTIONS.Envelope' is not a valid Avro schema name, so replacing with 'EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope'
o.s.c.f.common.cdc.CdcAutoConfiguration  : [CDC Event]: SourceRecord{sourcePartition=null, sourceOffset={commit_lsn=00000030:00002e0a:0028, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='foo', kafkaPartition=0, key=Struct{databaseName=server_ListeriaWGS}, keySchema=Schema{io.debezium.connector.sqlserver.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=1.3.0.Final,connector=sqlserver,name=foo,ts_ms=1632978889724,snapshot=true,db=server_ListeriaWGS,schema=dbo,table=ACTIONS,commit_lsn=00000030:00002e0a:0028},databaseName=server_ListeriaWGS,schemaName=dbo,tableChanges=[Struct{type=CREATE,id="server_ListeriaWGS"."dbo"."ACTIONS",table=Struct{primaryKeyColumnNames=[ACTIONID],columns=[Struct{name=ACTIONID,jdbcType=4,typeName=int,typeExpression=int,length=10,scale=0,position=1,optional=false,autoIncremented=false,generated=false}, Struct{name=TMSTAMP,jdbcType=12,typeName=varchar,typeExpression=varchar,length=40,position=2,optional=true,autoIncremented=false,generated=false}, Struct{name=APUSER,jdbcType=12,typeName=varchar,typeExpression=varchar,length=80,position=3,optional=true,autoIncremented=false,generated=false}, Struct{name=OSUSER,jdbcType=12,typeName=varchar,typeExpression=varchar,length=80,position=4,optional=true,autoIncremented=false,generated=false}, Struct{name=COMMENT,jdbcType=12,typeName=varchar,typeExpression=varchar,length=150,position=5,optional=true,autoIncremented=false,generated=false}, Struct{name=STATUS,jdbcType=4,typeName=int,typeExpression=int,length=10,scale=0,position=6,optional=true,autoIncremented=false,generated=false}]}}]}, valueSchema=Schema{io.debezium.connector.sqlserver.SchemaChangeValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
d.c.s.SqlServerSnapshotChangeEventSource : Schema locks released.
.d.r.RelationalSnapshotChangeEventSource : Snapshot step 7 - Snapshotting data
.d.r.RelationalSnapshotChangeEventSource :   Exporting data from table 'server_ListeriaWGS.dbo.ACTIONS'
.d.r.RelationalSnapshotChangeEventSource :   For table 'server_ListeriaWGS.dbo.ACTIONS' using select statement: 'SELECT [ACTIONS].[ACTIONID],[ACTIONS].[TMSTAMP],[ACTIONS].[APUSER],[ACTIONS].[OSUSER],[ACTIONS].[COMMENT],[ACTIONS].[STATUS] FROM [dbo].[ACTIONS]'
.d.r.RelationalSnapshotChangeEventSource :   Finished exporting 3 records for table 'server_ListeriaWGS.dbo.ACTIONS'; total duration '00:00:02,886'
.d.r.RelationalSnapshotChangeEventSource :   Finished exporting 3 records for table 'server_ListeriaWGS.dbo.ACTIONS'; total duration '00:00:00,23'
.d.p.s.AbstractSnapshotChangeEventSource : Snapshot - Final stage
c.b.n.d.c.a.LogDatabaseChanges           : {"ACTIONID":456}
c.b.n.d.c.a.LogDatabaseChanges           : {"before":null,"after":{"ACTIONID":456,"TMSTAMP":"789","APUSER":null,"OSUSER":null,"COMMENT":null,"STATUS":null},"source":{"version":"1.3.0.Final","connector":"sqlserver","name":"foo","ts_ms":1632978890036,"snapshot":"true","db":"server_ListeriaWGS","schema":"dbo","table":"ACTIONS","change_lsn":null,"commit_lsn":"00000030:00002e0a:0028","event_serial_no":null},"op":"r","ts_ms":1632978890043,"transaction":null}
c.b.n.d.c.a.LogDatabaseChanges           : {"ACTIONID":1234}
c.b.n.d.c.a.LogDatabaseChanges           : {"before":null,"after":{"ACTIONID":1234,"TMSTAMP":"456","APUSER":null,"OSUSER":null,"COMMENT":null,"STATUS":null},"source":{"version":"1.3.0.Final","connector":"sqlserver","name":"foo","ts_ms":1632978890049,"snapshot":"true","db":"server_ListeriaWGS","schema":"dbo","table":"ACTIONS","change_lsn":null,"commit_lsn":"00000030:00002e0a:0028","event_serial_no":null},"op":"r","ts_ms":1632978890049,"transaction":null}
c.b.n.d.c.a.LogDatabaseChanges           : {"ACTIONID":5645}
c.b.n.d.c.a.LogDatabaseChanges           : {"before":null,"after":{"ACTIONID":5645,"TMSTAMP":"15654","APUSER":null,"OSUSER":null,"COMMENT":null,"STATUS":null},"source":{"version":"1.3.0.Final","connector":"sqlserver","name":"foo","ts_ms":1632978890049,"snapshot":"last","db":"server_ListeriaWGS","schema":"dbo","table":"ACTIONS","change_lsn":null,"commit_lsn":"00000030:00002e0a:0028","event_serial_no":null},"op":"r","ts_ms":1632978890049,"transaction":null}
o.s.c.f.common.cdc.CdcAutoConfiguration  : [CDC Event]: SourceRecord{sourcePartition={server=foo}, sourceOffset={commit_lsn=00000030:00002e0a:0028, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='foo.dbo.ACTIONS', kafkaPartition=null, key=Struct{ACTIONID=456}, keySchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Key:STRUCT}, value=Struct{after=Struct{ACTIONID=456,TMSTAMP=789},source=Struct{version=1.3.0.Final,connector=sqlserver,name=foo,ts_ms=1632978890259,snapshot=true,db=server_ListeriaWGS,schema=dbo,table=ACTIONS,commit_lsn=00000030:00002e0a:0028},op=r,ts_ms=1632978890260}, valueSchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
o.s.c.f.common.cdc.CdcAutoConfiguration  : [CDC Event]: SourceRecord{sourcePartition={server=foo}, sourceOffset={commit_lsn=00000030:00002e0a:0028, snapshot=true, snapshot_completed=false}} ConnectRecord{topic='foo.dbo.ACTIONS', kafkaPartition=null, key=Struct{ACTIONID=1234}, keySchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Key:STRUCT}, value=Struct{after=Struct{ACTIONID=1234,TMSTAMP=456},source=Struct{version=1.3.0.Final,connector=sqlserver,name=foo,ts_ms=1632978890260,snapshot=true,db=server_ListeriaWGS,schema=dbo,table=ACTIONS,commit_lsn=00000030:00002e0a:0028},op=r,ts_ms=1632978890260}, valueSchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
o.s.c.f.common.cdc.CdcAutoConfiguration  : [CDC Event]: SourceRecord{sourcePartition={server=foo}, sourceOffset={commit_lsn=00000030:00002e0a:0028, snapshot=true, snapshot_completed=true}} ConnectRecord{topic='foo.dbo.ACTIONS', kafkaPartition=null, key=Struct{ACTIONID=5645}, keySchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Key:STRUCT}, value=Struct{after=Struct{ACTIONID=5645,TMSTAMP=15654},source=Struct{version=1.3.0.Final,connector=sqlserver,name=foo,ts_ms=1632978890260,snapshot=last,db=server_ListeriaWGS,schema=dbo,table=ACTIONS,commit_lsn=00000030:00002e0a:0028},op=r,ts_ms=1632978890260}, valueSchema=Schema{EC2AMAZ_DU501KO.dbo.ACTIONS.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
.d.p.s.AbstractSnapshotChangeEventSource : Snapshot - Final stage
d.c.s.SqlServerSnapshotChangeEventSource : Removing locking timeout
d.c.s.SqlServerSnapshotChangeEventSource : Removing locking timeout
i.d.p.ChangeEventSourceCoordinator       : Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=foo, changeLsn=NULL, commitLsn=00000030:00002e0a:0028, eventSerialNo=null, snapshot=FALSE, sourceTime=2021-09-30T05:14:50.049Z], partition={server=foo}, snapshotCompleted=true, eventSerialNo=1]]
.d.p.m.StreamingChangeEventSourceMetrics : Connected metrics set to 'true'
i.d.p.ChangeEventSourceCoordinator       : Starting streaming
i.d.p.ChangeEventSourceCoordinator       : Snapshot ended with SnapshotResult [status=COMPLETED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=foo, changeLsn=NULL, commitLsn=00000030:00002e0a:0028, eventSerialNo=null, snapshot=FALSE, sourceTime=2021-09-30T05:14:50.260Z], partition={server=foo}, snapshotCompleted=true, eventSerialNo=1]]
.d.p.m.StreamingChangeEventSourceMetrics : Connected metrics set to 'true'
i.d.p.ChangeEventSourceCoordinator       : Starting streaming
.c.s.SqlServerStreamingChangeEventSource : Last position recorded in offsets is 00000030:00002e0a:0028(NULL)[1]
.c.s.SqlServerStreamingChangeEventSource : Last position recorded in offsets is 00000030:00002e0a:0028(NULL)[1]

如您所见,c.b.n.d.c.a.LogDatabaseChanges日志中有几行,因此这意味着它在启动时工作。但是一旦快照完成,就不会再收到任何东西了。

我错过了什么?

标签: sql-serverspring-bootdebezium

解决方案


推荐阅读