首页 > 解决方案 > 从 ksqlDB 中的流创建表中连接的 Rowkey

问题描述

流是:

CREATE STREAM SENSORS_KSTREAM (sensorid INT,
  serialnumber VARCHAR,
  mfgdate VARCHAR,
  productname VARCHAR,
  customerid INT,
  locationid INT,
  macaddress VARCHAR,
  installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');

我用这个创建的表是:

CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate, COUNT(*) AS TOTAL 
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES) 
GROUP BY sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate;

在此处输入图像描述

产生的ROWKEY不是我想要的。

我只想要SENSORID作为行键。

谁能帮我做到这一点。

提前致谢。

PS:我正在使用独立的 Confluent 5.4.0。

标签: apache-kafkaksqldb

解决方案


ksqlDB 将表的主键存储在底层 Kafka 消息的键中。这对于确保重要的事情(例如相同键的一致分区分配和日志压缩)至关重要。

ksqlDB 不支持复合键,尽管这是一个正在开发的功能。因此,与此同时,当您按多列分组时,ksqlDB 会尽其所能并构建您遇到的复合键。不是很好,但它实际上适用于许多用例。

您上面的语句是在主键中创建一个包含许多列的表 - 它们当前都被序列化为单个 STRING 值。

您只要求SENSORID在键中...但是您的 GROUP BY 子句使所有列在键的一部分之后。

在我看来,您有一个包含传感器更新值流的主题。在这种情况下,我建议研究两个选项:

  1. 如果输入主题中的每一行都包含每个传感器的所有数据,那么为什么不将其作为 TABLE 而不是 STREAM 导入:
CREATE TABLE SENSORS_KSTREAM (sensorid INT,
  serialnumber VARCHAR,
  mfgdate VARCHAR,
  productname VARCHAR,
  customerid INT,
  locationid INT,
  macaddress VARCHAR,
  installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');
  1. 或者,也许LATEST_BY_OFFSET可以用于捕获每列的最新值:
CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, LATEST_BY_OFFSET(serialnumber), LATEST_BY_OFFSET(mfgdate), LATEST_BY_OFFSET(productname), LATEST_BY_OFFSET(customerid), LATEST_BY_OFFSET(locationid), LATEST_BY_OFFSET(macaddress), LATEST_BY_OFFSET(installationdate) 
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES) 
GROUP BY sensorid;

LAST_BY_OFFSET 仅在几个版本前引入,因此您可能需要更新。

希望这两个选项将帮助您到达您需要的地方。


推荐阅读