首页 > 解决方案 > 使用 Kafka 流/KSQL 加入表?

问题描述

我正在导入一个数据库,其中包含一些表示多对多和一对多关系的链接表。

现在让我们关注一对多的关系。例如,一个 Biossay 可以有多个文档,但一个文档只能有一个 BioAssay。

因此,我有一个 BioAssay [BioAssay, ..., ..., ...] 表和一个链接表 [Document, BioAssay]。

最终,我需要将这两个加入到完整的 BioAssay 及其所有文档中,例如 [BioAssayxyz, ...., "Document1:Document2:Document3"]

我想知道这里是否有人可以让我了解 Kafka 流需要发生什么?

1 - 到目前为止,根据我对 Kafka 流的理解,我似乎需要为每个链接表创建一个流,以便执行聚合。KTable 将无法使用,因为每个键都会更新记录。然而,聚合的结果可能是在 Ktable 中。

2 - 然后是加入外键的问题。似乎唯一的方法是通过 GlobalKtable。链接表主题-> 链接表流-> 链接表GlobaKTable。这可能会导致大量磁盘空间使用,因为我的表非常大。这是一个超大的数据库,有很多表,对数据构建多个逻辑视图的需求是项目核心的一部分,无法避免。

a)我在这里理解吗?

b)这是解决这个问题的唯一方法吗?

编辑1

听起来唯一存在的是 KStream-to-GlobalKTable,似乎我需要把事情颠倒一下。我原来的 DB BioAssay Table 需要变成流,而我的链接文档表需要先变成流进行聚合,然后变成 GlobalKTable 用于加入。

无论哪种方式,除非我的流只有一个分区,否则这可能非常昂贵。

标签: apache-kafkaapache-kafka-streamsksqldb

解决方案


几个月前,我碰巧用 Kafka Streams 处理过一个类似的用例,我很高兴分享我的经验。

按照您的建议使用 KStreams-to-KTable 会有点工作,尽管有一些您可能无法接受的警告。

首先,回想一下,只有当在流端接收到新事件时,Kafka Streams 才会更新流到表连接,而不是在 ktable 端。

其次,假设您使用 CDC 来导入数据库,那么我的理解是您无法保证更新在 Kafka 上的顺序。这意味着,即使您在 DB 端享受事务隔离,“一次”在表 Document 和 BioAssay 上出现更新或插入,但在 Kafka 端,您会以任意顺序收到一个,然后是另一个。

以上两点希望能清楚说明为什么 Kafka Streams 端的连接结果可能不会像您期望的那样反映数据库内容。

我采取的解决方案是“隐藏”并使用处理器 API 手动加入我的流。这允许实现表到表的连接语义,每当更新任何一方时都会更新。我在那篇博文中描述了核心思想:

https://svend.kelesia.com/one-to-many-kafka-streams-ktable-join.html

使用该技术,我能够从 DB 中正确导入一对多和多对多关系。


推荐阅读