apache-kafka - 我可以在 KSQL 中从表转换为流吗?
问题描述
我正在使用 KSQL 在 kafka 中工作。我想在 5 分钟内找出不同 DEV_NAME(ROWKEY) 中的最后一行。因此,我创建了流和聚合表以供进一步加入。
通过下面的 KSQL,我创建了表格,用于在 5 分钟内找出不同 DEV_NAME 的最后一行
CREATE TABLE TESTING_TABLE AS
SELECT ROWKEY AS DEV_NAME, max(ROWTIME) as LAST_TIME
FROM TESTING_STREAM WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY ROWKEY;
然后,我想一起加入:
CREATE STREAM TESTING_S_2 AS
SELECT *
FROM TESTING_S S
INNER JOIN TESTING_T T
ON S.ROWKEY = T.ROWKEY
WHERE
S.ROWTIME = T.LAST_TIME;
但是,它发生了错误:
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to the actual key type (key type: org.apache.kafka.connect.data.Struct). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
应该是WINDOW TUMBLING函数改变了我的ROWKEY风格
(e.g. DEV_NAME_11508 -> DEV_NAME_11508 : Window{start=157888092000 end=-}
因此,在不设置 Serdes 的情况下,我可以从表转换为流并设置 PARTITION BY DEV_NAME 吗?
解决方案
正如您所确定的,问题在于您的表格是窗口表格,这意味着表格的键是窗口的,您无法使用非窗口键查看窗口表格。
就目前而言,您的表将为每个 5 分钟窗口的每个 ROWKEY 生成一个唯一行。然而,除了最近的窗口之外,您似乎什么都不关心。可能是您不需要表格中的窗口,例如
CREATE TABLE TESTING_TABLE AS
SELECT
ROWKEY AS DEV_NAME,
max(ROWTIME) as LAST_TIME
FROM TESTING_STREAM
WHERE ROWTIME > (UNIX_TIMESTAMP() - 300000)
GROUP BY ROWKEY;
将跟踪每个键的最大时间戳,忽略任何超过 5 分钟的时间戳。(当然,此检查仅在收到事件时进行,5 分钟后不会删除该行)。
另外,这个加入:
CREATE STREAM TESTING_S_2 AS
SELECT *
FROM TESTING_S S
INNER JOIN TESTING_T T
ON S.ROWKEY = T.ROWKEY
WHERE
S.ROWTIME = T.LAST_TIME;
由于竞争条件,几乎可以肯定没有按照您的想法进行操作并且不会按照您想要的方式工作。
目前尚不清楚您要达到的目标。添加有关源数据和所需输出的更多信息可能会帮助人们为您提供解决方案。
推荐阅读
- reactjs - 我正在尝试将数据推送到 useState 中定义的数组中,但数据没有被推送到数组中
- java - 如何从文件中获取内容uri?
- jquery - Jquery else if 语句不能正常工作
- javascript - 如何使用类型脚本处理 json 对象
- python - python中的滑动窗口改变形状
- c++ - 我想使用 Raspberry Pi 使用 I2C 从 Arduino 读取
- css - Flexbox:子项不占父项的 100%,仅占用其最小宽度
- java - 无法使用 Apache Commons CSV 从 CSV 文件中解析最终列
- python - Moviepy剪辑改变尺寸?
- node.js - 在 PEAN github 克隆项目上指定的令牌无效