首页 > 解决方案 > 使用 Flink SQL API 处理事件

问题描述

我的用例 - 收集特定持续时间的事件,然后根据键对它们进行分组

Objective 处理后,用户可以根据key保存特定时长的数据

我打算怎么做 1)从 Kafka 接收事件

2)创建事件数据流

3)通过运行 SQL 查询将表与其关联并收集特定持续时间的数据

4)将新表与第二步输出相关联,并根据键对收集的数据进行分组

5)将数据保存在数据库中

我试过的解决方案-

我能够-

1)从卡夫卡接收事件,

2)设置数据流(比如说sensorDataStream)-

DataStream<SensorEvent> sensorDataStream 
         = source.flatMap(new FlatMapFunction<String, SensorEvent>() {
            @Override
            public void flatMap(String catalog, Collector<SensorEvent> out) {
            // create SensorEvent(id, sensor notification value, notification time) creation
             });

3)将表(比如说table1)与数据流相关联,并在运行SQL查询之后 -

SELECT id, sensorNotif, notifTime FROM SENSORTABLE WHERE notifTime > t1_Timestamp AND notifTime < t2_Timestamp.

这里 t1_Timestamp 和 t2_Timestamp 是预定义的纪元时间,会根据一些预定义的条件而改变

4)我可以通过在控制台上使用以下查询来打印此 sql 查询结果-

tableEnv.toAppendStream(table1, Row.class).print();

5)使用table1和以下类型的sql查询创建了一个新表(比如说table2)-

Table table2 = tableEnv.sqlQuery("SELECT id AS SensorID, COUNT(sensorNotif) AS SensorNotificationCount FROM table1 GROUP BY id);

6)通过使用收集和打印数据 -

tableEnv.toRetractStream(table2 , Row.class).print();

问题

1)我无法在控制台上看到第 6 步的输出。

我做了一些实验,发现如果我跳过 table1 设置步骤(这意味着一段时间内没有传感器数据俱乐部)并将我的 senserDataStream 与 table2 直接关联,那么我可以看到第 6 步的输出,但因为这是 RetractStream 所以我可以看到形式的数据,如果新事件即将到来,则此撤回流将使数据无效并打印新计算的数据。

我想要的建议

1)如何合并步骤 5 和步骤 6(表示表 1 和表 2)。我已经合并了这些表,但是由于控制台上看不到数据,所以我有疑问?难道我做错了什么?还是数据已合并但不可见?

2)我的计划是——

2.a) 在 2 遍中过滤数据,在第一遍中过滤特定时间间隔的数据,在第二遍中将数据分组

2.b)在数据库中保存 2.a 输出 这种方法是否有效(我有疑问,因为我正在使用数据流并且 table1 输出是附加流但 table2 输出是撤回流)?

标签: apache-flinkflink-streamingflink-sql

解决方案


推荐阅读