首页 > 解决方案 > 如何使用 flink cep 完成聚合任务

问题描述

我需要计算一天中发生 A 的次数以及在 15 分钟内发生 B 的次数。流可能是 A1 ,A2,B1,B2,A3,B3,B4,B5,A4,A5,A6,A7,B6 。在我的情况下,事件结果是A2,B1 A3,B3 A7,B6。当匹配器发生时我需要接收实时结果。我已经厌倦了一些东西。我认为这只能通过使用flink cep才能实现。但是flink- sql-cep 不支持聚合。它只计算事件发生。在这种情况下,如何用一条 SQL 完成这个任务。

我累了两步。我先用flink sql cep to matcher,然后sink to kafka。在一步中,我使用 pre kafka 并使用窗口聚合。

第一步: select pins as pin,'first-step' as result_id, cast(order_amount as varchar) as result_value,event_time as result_time from stra_dtpipeline MATCH_RECOGNIZE ( PARTITION BY pin
ORDER BY event_time MEASURES
t1.pin as pins, '1' as order_amount , LOCALTIMESTAMP as event_time ONE ROW PER MATCH AFTER MATCH SKIP to next row PATTERN (t1 t2) WITHIN INTERVAL '30' SECOND
DEFINE
t1 as t1.act_type='100001' , t2 as t2.act_type='100002' ) 第二步:select pin,'job5' as result_id,cast(sum(1) over (PARTITION BY pin,cast(DATE_FORMAT(event_time, '%Y%m%d') as VARCHAR) order by event_time ROWS BETWEEN INTERVAL '1' DAY PRECEDING AND CURRENT ROW ) as VARCHAR) as result_value, CURRENT_TIMESTAMP as result_time from stra_dtpipeline_mid where result_id='first-step' and DAYOFMONTH(CURRENT_DATE )=DAYOFMONTH(event_time)

我希望用一个 SQL 完成这项任务。

标签: apache-flinkflink-cepflink-sql

解决方案


您可以使用子查询或视图将这两个查询组合成一个查询。

那会是这样的

SELECT a, b OVER (...) ORDER BY event_time FROM (SELECT x, y MATCH_RECOGNIZE ...) WHERE ...

或者

CREATE VIEW pattern AS SELECT x, y MATCH_RECOGNIZE ...
SELECT ... FROM pattern WHERE ...

推荐阅读