sql-server - Debezium SqlServerConnector 不会将更改从 DB 刷新到 Kafka
问题描述
我正在尝试通过 Debezium SqlServerConnector 将 MSSQL CDC 更改加载到 Kafka。
对于 MSSQL CDC 测试,testDB 的创建如下所述:https
://github.com/debezium/debezium-examples/blob/master/tutorial/debezium-sqlserver-init/inventory.sql#L32
但使用@supports_net_changes = 1;
sys.sp_cdc_help_change_data_capture 使用当前设置返回表。
执行以下操作后:
INSERT INTO products(name,description,weight) VALUES ('box','this is a cool box',7.0);
GO
UPDATE products SET name = 'cool box' WHERE name = 'box'
GO
UPDATE products SET name = 'box' WHERE name = 'cool box'
GO
DELETE FROM products WHERE description LIKE '%cool box%';
并通过以下脚本读取 CDC 更改:
DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10);
-- Obtain the beginning of the time interval.
SET @begin_time = GETDATE() -1;
-- Obtain the end of the time interval.
SET @end_time = GETDATE();
-- Map the time interval to a change data capture query range.
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time);
SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_products(@from_lsn, @to_lsn, N'all');
我在结果表中查看正确的 2-4-4-1 状态更改。
Debezium Connect Plugin的设置如下:
name=DbBssConnector
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
tasks.max=1
database.hostname=host01
database.port=1433
database.user=sa
database.password=*******
database.dbname=testDB
database.server.name=fullfillment
database.history.kafka.bootstrap.servers=172.26.26.142:9092
database.history.kafka.topic=dbhistory.fullfillment
errors.log.enable=true
database.history.skip.unparseable.ddl=true
time.precision.mode=connect
在此之前,我将 Kafka Connect 运行为:
sudo bin/connect-standalone.sh config/connect-standalone.properties myconfig.properties
但我看到以下日志:
INFO WorkerSourceTask{id=DbBssConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487)
插件不会从日志中加载更改,尽管它们在 CDC 日志中(见上文)
有什么问题?
解决方案
推荐阅读
- c# - ASP.NET Core JwtBearerIdentity 不起作用
- javascript - 评论系统与点击星
- python - 是否可以使用用 R 和 Python 训练的模型
- javascript - 修复 d3 js Sankey 中接收器节点的垂直对齐
- php - 用于非常大文件的 laravel 的 xls 解析器
- php - imap_fetchbody 与 Gmail api 获取方法
- polymer - 更新聚合物 JS 中的属性值时面临的问题
- java - 如何从 JAR 中的类路径读取文件?
- c - 根据偏移量和长度在数组中设置 bin 字段
- php - 如何访问侦听器中保存的实体?