apache-kafka - ksqlDB:将流连接到嵌套结构并接收到 postgresql
问题描述
我们io.debezium.connector.postgresql.PostgresConnector
在 postgersql db 上设置了源连接器(),它将监听 3 个表并将数据更改转发到 Kafka 主题。
并为这 3 个主题创建了流。以下是运行良好并收听表格的流列表。
从主题创建的流“Stream_Project”projects
+-----------+-------------+
|ProjectId |ProjectName |
+-------------------------+
|1 |Project 1 |
|2 |Project 2 |
+-----------+-------------+
从主题创建的流“Stream_Skill”数据skills
+-----------+-------------+-------------+-------------+
|SkillId |ProjectId |SkillName |Proficiency |
+-----------+-------------+-------------+-------------+
|1 |1 |Skill 101 |L1 |
|2 |1 |Skill 102 |L2 |
|3 |2 |Skill 201 |L1 |
|4 |2 |Skill 202 |L2 |
+-----------+-------------+-------------+-------------+
从主题创建的流“Stream_Tech”数据techs
+-----------+-------------+-------------+
|TechId |ProjectId |TechName |
+-----------+-------------+-------------+
|1 |1 |Tech 101 |
|2 |1 |Tech 102 |
|3 |2 |Tech 201 |
|4 |2 |Tech 202 |
+-----------+-------------+-------------+
现在我正在尝试加入所有流以在新的流/表中获得如下结果,我可以将其Sink
连接到另一个 PostgresSql 数据库。但我不确定如何通过加入所有 3 个流数据获得如下结果。
+-----------+-------------+--------------------------------------------------------------------------------------------+--------------------------------------------------+
|ProjectId |ProjectName |Skills |Techs |
+-----------+-------------+--------------------------------------------------------------------------------------------+--------------------------------------------------+
|1 |Project 1 |[{"SkillName":"Skill 101","Proficiency":"L1"},{"SkillName":"Skill 102","Proficiency":"L2"}] |[{"TechName":"Tech 101"},{"TechName":"Tech 102"}] |
|2 |Project 2 |[{"SkillName":"Skill 201","Proficiency":"L1"},{"SkillName":"Skill 202","Proficiency":"L2"}] |[{"TechName":"Tech 201"},{"TechName":"Tech 202"}] |
+-----------+-------------+--------------------------------------------------------------------------------------------+--------------------------------------------------+
任何人都可以指导我或提供生成上述输出的方法,ksqlDB
谢谢
解决方案
推荐阅读
- graphhopper - Graphhopper Route optimization api - 我们可以使用时间窗口约束作为软约束吗
- java - 按 JRadioButton 对 JTable 进行排序
- python-3.x - NETCDF4 文件不会超过 2GB
- android - 在 Android Studio 的模拟器窗口中看不到更多选项
- c# - 像类型一样使用,但它是一个命名空间
- rust - 语句的右侧在做什么?
- javascript - 如何将循环项目包装在
- 使用模板文字标记
- tags - 如何知道我应该使用哪种数据类型才能登录标签?
- python - RuntimeError:张量a(4144)的大小必须与非单维3站点的张量b(256)的大小匹配:stackoverflow.com
- flutter - 如何在飞镖的某个时间间隔内一个接一个地显示两个小部件