首页 > 解决方案 > 我可以在 KSQL 中从表转换为流吗?

问题描述

我正在使用 KSQL 在 kafka 中工作。我想在 5 分钟内找出不同 DEV_NAME(ROWKEY) 中的最后一行。因此,我创建了流和聚合表以供进一步加入。

通过下面的 KSQL,我创建了表格,用于在 5 分钟内找出不同 DEV_NAME 的最后一行

CREATE TABLE TESTING_TABLE  AS
SELECT  ROWKEY AS DEV_NAME, max(ROWTIME) as LAST_TIME 
    FROM TESTING_STREAM WINDOW TUMBLING (SIZE 5 MINUTES)
    GROUP BY ROWKEY;

然后,我想一起加入:

CREATE STREAM TESTING_S_2 AS 
  SELECT *
    FROM TESTING_S  S
        INNER JOIN TESTING_T T
        ON    S.ROWKEY = T.ROWKEY
    WHERE  
    S.ROWTIME = T.LAST_TIME;

但是,它发生了错误:

Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.streams.kstream.TimeWindowedSerializer) is not compatible to the actual key type (key type: org.apache.kafka.connect.data.Struct). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

应该是WINDOW TUMBLING函数改变了我的ROWKEY风格

(e.g. DEV_NAME_11508 -> DEV_NAME_11508 : Window{start=157888092000 end=-}       

因此,在不设置 Serdes 的情况下,我可以从表转换为流并设置 PARTITION BY DEV_NAME 吗?

标签: apache-kafkastreamingksqldb

解决方案


正如您所确定的,问题在于您的表格是窗口表格,这意味着表格的键是窗口的,您无法使用非窗口键查看窗口表格。

就目前而言,您的表将为每个 5 分钟窗口的每个 ROWKEY 生成一个唯一行。然而,除了最近的窗口之外,您似乎什么都不关心。可能是您不需要表格中的窗口,例如

CREATE TABLE TESTING_TABLE AS 
   SELECT 
     ROWKEY AS DEV_NAME, 
     max(ROWTIME) as LAST_TIME  
   FROM TESTING_STREAM 
   WHERE ROWTIME > (UNIX_TIMESTAMP() - 300000) 
   GROUP BY ROWKEY;

将跟踪每个键的最大时间戳,忽略任何超过 5 分钟的时间戳。(当然,此检查仅在收到事件时进行,5 分钟后不会删除该行)。

另外,这个加入:

CREATE STREAM TESTING_S_2 AS 
  SELECT *
    FROM TESTING_S  S
        INNER JOIN TESTING_T T
        ON    S.ROWKEY = T.ROWKEY
    WHERE  
    S.ROWTIME = T.LAST_TIME;

由于竞争条件,几乎可以肯定没有按照您的想法进行操作并且不会按照您想要的方式工作。

目前尚不清楚您要达到的目标。添加有关源数据和所需输出的更多信息可能会帮助人们为您提供解决方案。


推荐阅读