首页 > 解决方案 > 如何在 flink sql 表中自动生成水印?

问题描述

我正在测试 flink cep sql 并且我的水印被定义为行时间,我的表是一个 kafka 表。由于 watermark 依赖于所有 kafka 分区中的最小值,所以每条新消息都必须等待 kafka 分区对齐,然后 cep 触发结果。

我的 kafka 表(主题有 3 个分区)定义为

create table test_table(
    agent_id String, room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time as to_timestamp_ltz(create_time, 3), 
    WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)

这是我的cep sql

select * from test_table  match_recognize (
   partition by agent_id,room_id,call_type 
   order by row_time
   measures  
       last(BF.create_time) as create_time, 
       last(AF.connect_time) as connect_time 
   one row per match after match SKIP PAST LAST ROW 
   pattern (BF+ AF) WITHIN INTERVAL '1' HOUR 
   define 
       BF as BF.connect_time = 0,
       AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type 
) as T ;

cep sql 触发结果是正确的,但总是迟到,因为每个分区都需要对齐水印。如何立即获得最新结果或在 flink sql 表中自动生成水印

标签: apache-flinkflink-sqlflink-cep

解决方案


您的模式是要求connect_time > 0在行 where 之后立即找到一行connect_time = 0(其中两行具有相同的 room_id 和 call_type)。为了让这种模式匹配完全正确,有必要等待水印。否则,过早的匹配可能会因乱序事件的到来而失效——例如,connect_time < 0恰好在 AF 之前的事件。(你可能知道这是不可能的,但 cep/sql 引擎不知道。)

如果您愿意放宽模式匹配语义,为什么不将此 MATCH_RECOGNIZE 查询替换为间隔连接(具有时间约束的自连接)。有关详细信息,请参阅https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins

BTW,这部分的定义AF

... and BF.room_id = AF.room_id and BF.call_type = AF.call_type

没有任何效果,因为流已经被room_id和分区了call_type


推荐阅读