apache-kafka - KSQLDB - 从 debezium cdc 源连接器获取数据并将 Stream 与表连接
问题描述
伙计们。
先介绍一下场景:
我MS SQL SERVER
通过使用从 a 中的两个表中获取数据Debezium CDC Source Connector
。按照连接器配置:
PROVIDER 表的连接器:
CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_PROVIDER WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= '<URL>',
'database.port'= '1433',
'database.user'= '<USER>',
'database.password'= '<PASS>',
'database.dbname'= 'a',
'database.server.name'= 'a',
'table.whitelist'='dbo.PROVIDER',
'decimal.handling.mode'='double',
'transforms'= 'unwrap,addTopicPrefix',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
'transforms.addTopicPrefix.regex'='(.*)',
'transforms.addTopicPrefix.replacement'='mssql-01-$1',
'database.history.kafka.bootstrap.servers'= 'kafka:29092',
'database.history.kafka.topic'= 'dbhistory.PROVIDER'
);
ORDERS 表的连接器:
CREATE SOURCE CONNECTOR SOURCE_MSSQL_01_ORDER WITH (
'connector.class'= 'io.debezium.connector.sqlserver.SqlServerConnector',
'database.hostname'= '<URL>',
'database.port'= '1433',
'database.user'= '<USER>',
'database.password'= '<PASS>',
'database.dbname'= 'a',
'database.server.name'= 'a',
'table.whitelist'='dbo.ORDER',
'decimal.handling.mode'='double',
'transforms'= 'unwrap,addTopicPrefix',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.addTopicPrefix.type'='org.apache.kafka.connect.transforms.RegexRouter',
'transforms.addTopicPrefix.regex'='(.*)',
'transforms.addTopicPrefix.replacement'='mssql-01-$1',
'database.history.kafka.bootstrap.servers'= 'kafka:29092',
'database.history.kafka.topic'= 'dbhistory.ORDER'
);
我认为它可以改进,但目前还可以。
设置连接器后,我们可以创建流和表:
CREATE TABLE PROVIDER (ID_P VARCHAR PRIMARY KEY) WITH (KAFKA_TOPIC='mssql-01-a.dbo.PROVIDER', VALUE_FORMAT='AVRO');
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='mssql-01 a.dbo.ORDERS',VALUE_FORMAT='AVRO');
如您所见,现在它只是使用来自 PROVIDER 表的数据来丰富 ORDERS 流,对吗?是的,但不是。
SELECT P.PROVIDER_COD, O.ID FROM ORDERS AS O JOIN PROVIDER AS P ON O.PROV = P.PROVIDER_COD EMIT CHANGES;
如果我尝试这样做,我会收到一个错误:
无法重新分区 TABLE 源。如果这是一个联接,请确保条件使用 TABLE 的键列 ID_P 而不是 [PROVIDER_COD]
好吧,它应该很容易修复,但在这种情况下并非如此。最后我们解决了我的问题:
Provider's id
不在 中,因为这ORDERS stream
就是我从中获取数据的数据库的设计方式。
我们如何关联这两个数据集?
如果它是一个关系数据库,那将很容易:
SELECT * FROM ORDERS O INNER JOIN PROVIDER P ON O.PROV = P.PROVIDER_COD AND O.SUB_COD = P.SUB_COD;
是的......我之前没有提到它,但我们这里有一个复合键,Provider Code
and Provider' Subsidiary Code
,我认为这是另一个问题。
拜托,谁能帮我理解如何解决这个问题KSQLDB
?
非常感谢。
解决方案
我在 Confluent 论坛上找到了解决方案。
感谢 Matthias J. Sax
推荐阅读
- python - 过滤掉我的查询集的重复项
- wordpress - 更新的数据库 - 现在无法登录到 wordpress 管理员 - 用户名无效
- kendo-ui - 基于复选框的选择不适用于引导主题
- c - 可以使用 mDNS 解析为链接本地地址吗?
- mysql - 如何使用并行插入语句在 MySQL 表中插入巨大的 Pandas 数据框?
- c - 二维矩阵的哪种表示更快
- amazon-web-services - 组织多个项目 (AWS)
- java - 通过 scala.sys.process API 在 spark-scala 中执行外部命令 s3-dist-cp
- mysql - 当我尝试仅从一个用户获取数据时,查询从两个不同的用户获取数据
- python-3.x - 如何在 Python 中对每三行进行线性回归?