首页 > 解决方案 > KSQL 创建表作为选择

问题描述

CREATE TABLE AS SELECT 只能通过分组来完成,但是如何在 Base 中组织一个表(您需要一个没有聚合/分组的简单表)以便用数据补充另一个流?举个例子:有一个带有字段(产品,city_id)的流 A。我们需要一个带有字段(city_id,city_name)的表(或其他东西),由另一个线程补充。

并且有一个流将补充流 A 与表中的名称连接起来。

如何使用外部目录组织数据丰富?

标签: apache-kafkaksqldb

解决方案


您可以通过LATEST_BY_OFFSET这种方式使用聚合来构建数据表。

CREATE STREAM source_city_data 
  WITH (KAFKA_TOPIC='source_city_data', FORMAT='AVRO');

CREATE TABLE city_data AS 
  SELECT city_id, LATEST_BY_OFFSET(city_name) AS city_name 
  FROM source_city_data;

推荐阅读