首页 > 解决方案 > ksqlDB:将流连接到嵌套结构并接收到 postgresql

问题描述

我们io.debezium.connector.postgresql.PostgresConnector在 postgersql db 上设置了源连接器(),它将监听 3 个表并将数据更改转发到 Kafka 主题。

并为这 3 个主题创建了流。以下是运行良好并收听表格的流列表。

从主题创建的流“Stream_Project”projects

+-----------+-------------+
|ProjectId  |ProjectName  |
+-------------------------+
|1          |Project 1    |
|2          |Project 2    |
+-----------+-------------+

从主题创建的流“Stream_Skill”数据skills

+-----------+-------------+-------------+-------------+
|SkillId    |ProjectId    |SkillName    |Proficiency  |
+-----------+-------------+-------------+-------------+
|1          |1            |Skill 101    |L1           |
|2          |1            |Skill 102    |L2           |
|3          |2            |Skill 201    |L1           |
|4          |2            |Skill 202    |L2           |
+-----------+-------------+-------------+-------------+

从主题创建的流“Stream_Tech”数据techs

+-----------+-------------+-------------+
|TechId     |ProjectId    |TechName     |
+-----------+-------------+-------------+
|1          |1            |Tech 101     |
|2          |1            |Tech 102     |
|3          |2            |Tech 201     |
|4          |2            |Tech 202     |
+-----------+-------------+-------------+

现在我正在尝试加入所有流以在新的流/表中获得如下结果,我可以将其Sink连接到另一个 PostgresSql 数据库。但我不确定如何通过加入所有 3 个流数据获得如下结果。

+-----------+-------------+--------------------------------------------------------------------------------------------+--------------------------------------------------+
|ProjectId  |ProjectName  |Skills                                                                                      |Techs                                             |
+-----------+-------------+--------------------------------------------------------------------------------------------+--------------------------------------------------+
|1          |Project 1    |[{"SkillName":"Skill 101","Proficiency":"L1"},{"SkillName":"Skill 102","Proficiency":"L2"}] |[{"TechName":"Tech 101"},{"TechName":"Tech 102"}] |
|2          |Project 2    |[{"SkillName":"Skill 201","Proficiency":"L1"},{"SkillName":"Skill 202","Proficiency":"L2"}] |[{"TechName":"Tech 201"},{"TechName":"Tech 202"}] |
+-----------+-------------+--------------------------------------------------------------------------------------------+--------------------------------------------------+

任何人都可以指导我或提供生成上述输出的方法,ksqlDB 谢谢

标签: apache-kafkaconfluent-platformksqldb

解决方案


推荐阅读