apache-kafka - 从 ksqlDB 中的流创建表中连接的 Rowkey
问题描述
流是:
CREATE STREAM SENSORS_KSTREAM (sensorid INT,
serialnumber VARCHAR,
mfgdate VARCHAR,
productname VARCHAR,
customerid INT,
locationid INT,
macaddress VARCHAR,
installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');
我用这个创建的表是:
CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate, COUNT(*) AS TOTAL
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY sensorid, serialnumber, mfgdate, productname, customerid, locationid, macaddress, installationdate;
产生的ROWKEY不是我想要的。
我只想要SENSORID作为行键。
谁能帮我做到这一点。
提前致谢。
PS:我正在使用独立的 Confluent 5.4.0。
解决方案
ksqlDB 将表的主键存储在底层 Kafka 消息的键中。这对于确保重要的事情(例如相同键的一致分区分配和日志压缩)至关重要。
ksqlDB 不支持复合键,尽管这是一个正在开发的功能。因此,与此同时,当您按多列分组时,ksqlDB 会尽其所能并构建您遇到的复合键。不是很好,但它实际上适用于许多用例。
您上面的语句是在主键中创建一个包含许多列的表 - 它们当前都被序列化为单个 STRING 值。
您只要求SENSORID
在键中...但是您的 GROUP BY 子句使所有列在键的一部分之后。
在我看来,您有一个包含传感器更新值流的主题。在这种情况下,我建议研究两个选项:
- 如果输入主题中的每一行都包含每个传感器的所有数据,那么为什么不将其作为 TABLE 而不是 STREAM 导入:
CREATE TABLE SENSORS_KSTREAM (sensorid INT,
serialnumber VARCHAR,
mfgdate VARCHAR,
productname VARCHAR,
customerid INT,
locationid INT,
macaddress VARCHAR,
installationdate VARCHAR)
WITH (KAFKA_TOPIC='SENSORS_DETAILS', VALUE_FORMAT='AVRO', KEY='sensorid');
- 或者,也许
LATEST_BY_OFFSET
可以用于捕获每列的最新值:
CREATE TABLE SENSORS_KTABLE AS
SELECT sensorid, LATEST_BY_OFFSET(serialnumber), LATEST_BY_OFFSET(mfgdate), LATEST_BY_OFFSET(productname), LATEST_BY_OFFSET(customerid), LATEST_BY_OFFSET(locationid), LATEST_BY_OFFSET(macaddress), LATEST_BY_OFFSET(installationdate)
FROM SENSORS_KSTREAM WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY sensorid;
LAST_BY_OFFSET 仅在几个版本前引入,因此您可能需要更新。
希望这两个选项将帮助您到达您需要的地方。
推荐阅读
- c# - 如何检查是否单击了 UI 对象?
- node.js - 如何使新的 elasticsearch node.js 客户端仅在承诺中返回正文?
- firebase - 当原生应用程序处于“后台”时,是否应该删除 Firebase 侦听器?
- java - 使用从 Eclipse 生成的 Ant 脚本创建可执行 Jar
- networking - 使用 Networkx 在世界上生成六边形格子图
- c++ - 与 C++ 中的水平顺序横向混淆
- django - 如何在 django 中尝试删除模型时查看相关对象
- python - 每个第 N 个字符拆分字符串并使用不同的分隔符将其连接回来
- javascript - 我可以将 js 数组传递到输入字段吗
- vue.js - 元素 UI,使用复选框将 id 传递给表列