首页 > 解决方案 > KSQL:(拉)-查询连接表

问题描述

我有两个要加入的主题,然后查询加入以获取最新结果。我Create a ksqlDB Table from a ksqlDB Stream这里开始关注文档。

这就是我所做的:

CREATE TABLE CATALOGUE_TABLE
  (title STRING)
  WITH (KAFKA_TOPIC='catalogue-topic-test',
        VALUE_FORMAT='AVRO');
CREATE TABLE SCHEDULE_TABLE
  (fromInstant STRING,
   toInstant STRING)
  WITH (KAFKA_TOPIC='schedule-topic-test',
        VALUE_FORMAT='AVRO');

请放心,这两个基础主题都有其所有条目的键。然后我像这样加入他们:

CREATE TABLE MYTABLE AS
  SELECT c.title, s.fromInstant, s.toInstant
  FROM CATALOGUE_TABLE c
  INNER JOIN SCHEDULE_TABLE s
  ON s.ROWKEY = c.ROWKEY
  EMIT CHANGES;

我不确定我最终会得到什么。不管是什么,我都可以运行以下命令:

select * from MYTABLE EMIT CHANGES;

我可以看到上面的所有更新,以及所有重复项。它基本上是一条溪流。现在,如果我运行以下命令:

select * from MYTABLE WHERE ROWKEY='12';

要获得 id=12 的最后更新,我得到:

表 'MYTABLE' 未实现。有关查询类型的信息,请参阅https://cnfl.io/queries。如果你...

其余的输出被截断,所以我看不到它想说什么。我的猜测是我在 MYTABLE 中做错了什么。

我想我错过了一个 groupBy ,它应该是负责删除所有具有重复 ID 的条目的部分,但我无法弄清楚我需要在那里放什么以及我是否应该只在 MYTABLE 级别这样做,或者是否应该在所有三个表上完成。

标签: apache-kafkaconfluent-platformksqldb

解决方案


目前,即ksqlDB 0.6.0,只有返回表的流聚合查询才允许查询结果表。

对于表-表连接,结果不会具体化到本地存储中,而只会生成更改日志流并将其写入与结果表对应的结果主题。


推荐阅读