apache-kafka - KSQL 窗口化聚合流
问题描述
我正在尝试使用KSQL Windowed Aggregation,特别是Session Window按其属性之一和随着时间的推移对事件进行分组。
我有一个STREAM
由 kafka 主题制成的,具有TIMESTAMP
明确指定的属性。
当我尝试STREAM
使用以下查询创建带有会话窗口的会话时:
CREATE STREAM SESSION_STREAM AS
SELECT ...
FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
GROUP BY ...;
我总是得到错误:
您的 SELECT 查询会生成一个 TABLE。请改用 CREATE TABLE AS SELECT 语句。
STREAM
是否可以使用窗口聚合创建一个?
当我按照建议尝试创建一个TABLE
,然后创建一个STREAM
包含所有会话开始事件的 a 时,使用如下查询:
CREATE STREAM SESSION_START_STREAM AS
SELECT *
FROM SESSION_TABLE
WHERE WINDOWSTART=WINDOWEND;
KSQL 告诉我:
KSQL 不支持对窗口表的持久查询
如何STREAM
在 KSQL 中创建一个启动会话窗口的事件?
解决方案
如果切换到 create table 语句,您的 create stream 语句将创建一个不断更新的表。接收器主题SESSION_STREAM
将包含对表的更改流,即其更改日志。
ksqlDB 将其建模为 TABLE,因为它具有 TABLE 语义,即表中只能存在具有任何特定键的单行。但是,更改日志将包含已应用于表的更改的流。
如果您想要的是包含所有会话的主题,那么类似这样的内容将创建:
-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT)
WITH (kafka_topic='data', value_format='json');
-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
FROM DATA
WINDOW SESSION (5 SECONDS)
GROUP BY USER_ID;
这将创建一个SESSIONS
包含对SESSIONS
表的更改的主题:即其更改日志。
如果您想将其转换为会话开始事件流,那么不幸的是 ksqlDB还不允许您直接更改从表创建流,但您可以在表的更改日志上创建流:
-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS
SELECT * FROM SESSION_STREAM
WHERE WINDOWSTART = WINDOWEND;
请注意,在即将发布的 0.10 版本中,您将能够SESSION_STREAM
正确命名键列:
CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT)
WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');
推荐阅读
- r - 如何将外生输入添加到 nnetTs 函数?
- java - Kafka AVRO消费者端如何丢弃AVRO数据的默认值字段?
- javascript - 如何动态创建属性并为其添加值?
- delphi - 将 PNG 资源加载到 TGPImage
- google-app-engine - 如何在 Java 8 模式下运行 IntelliJ 的 Google App Engine Dev Server 插件
- apache-flink - Apache flink - 阅读后将文件移动到不同的文件夹
- vue.js - vue.js pass function to component as prop
- javascript - Display several images with vuejs
- typescript - Typescript Promise 文件上传
- php - How to show multiple images by ajax success