首页 > 解决方案 > Debezium SqlServerConnector 不会将更改从 DB 刷新到 Kafka

问题描述

我正在尝试通过 Debezium SqlServerConnector 将 MSSQL CDC 更改加载到 Kafka。

对于 MSSQL CDC 测试,testDB 的创建如下所述:https
://github.com/debezium/debezium-examples/blob/master/tutorial/debezium-sqlserver-init/inventory.sql#L32 但使用@supports_net_changes = 1;

sys.sp_cdc_help_change_data_capture 使用当前设置返回表。

执行以下操作后:

INSERT INTO products(name,description,weight) VALUES ('box','this is a cool box',7.0);
GO
UPDATE products SET name = 'cool box' WHERE name = 'box'
GO
UPDATE products SET name = 'box' WHERE name = 'cool box'
GO
DELETE FROM products WHERE description LIKE '%cool box%';

并通过以下脚本读取 CDC 更改:

DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10);  
-- Obtain the beginning of the time interval.  
SET @begin_time = GETDATE() -1;  
-- Obtain the end of the time interval.  
SET @end_time = GETDATE();  
-- Map the time interval to a change data capture query range.  
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time);  
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time);  

SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_products(@from_lsn, @to_lsn, N'all');

我在结果表中查看正确的 2-4-4-1 状态更改。

Debezium Connect Plugin的设置如下:

name=DbBssConnector
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
tasks.max=1
database.hostname=host01
database.port=1433
database.user=sa
database.password=*******
database.dbname=testDB
database.server.name=fullfillment
database.history.kafka.bootstrap.servers=172.26.26.142:9092
database.history.kafka.topic=dbhistory.fullfillment
errors.log.enable=true
database.history.skip.unparseable.ddl=true
time.precision.mode=connect

在此之前,我将 Kafka Connect 运行为:

sudo bin/connect-standalone.sh config/connect-standalone.properties myconfig.properties

但我看到以下日志:

INFO WorkerSourceTask{id=DbBssConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)

插件不会从日志中加载更改,尽管它们在 CDC 日志中(见上文)

有什么问题?

标签: sql-serverapache-kafkaapache-kafka-connectdebezium

解决方案


推荐阅读