首页 > 解决方案 > 在 Flink SQL 流中的“Group Aggregation”子查询之后使用“Over Aggregation”

问题描述

我正在使用链接中的数据:https ://github.com/ververica/sql-training/wiki/Setting-up-the-Training-Environment我想在 30 分钟的窗口中找到前 10 条最频繁的路线.

首先,我按路线计算了游乐设施(团体聚合):

  SELECT
    T1.Starting_areaId,
    T1.Ending_areaId,
    TUMBLE_END(T1.matchTime, INTERVAL '30' MINUTE) AS End_Time,
    COUNT(T1.Starting_areaId) AS No_Rides
  FROM (
    SELECT * FROM Rides
    MATCH_RECOGNIZE(
      PARTITION BY taxiId, rideId
      ORDER BY rideTime
      MEASURES
        toAreaId(P.lon, P.lat) AS Starting_areaId,
        toAreaId(D.lon, D.lat) AS Ending_areaId,
        MATCH_ROWTIME() AS matchTime
      AFTER MATCH SKIP PAST LAST ROW
      PATTERN(P D)
      DEFINE
        P AS P.isStart = true,
        D AS D.isStart = false
    )
  ) AS T1
  GROUP BY 
    T1.Starting_areaId,
    T1.Ending_areaId,
    TUMBLE(T1.matchTime, INTERVAL '30' MINUTE)

但是,当我尝试使用 (Over Aggregation) 通过以下查询按路线排列游乐设施数量时:

SELECT
  T2.Starting_areaId,
  T2.Ending_areaId,
  T2.End_Time,
  T2.No_Rides,
  RANK() OVER(
    PARTITION BY T2.End_Time
    ORDER BY T2.No_Rides
  ) AS Ranking
FROM (
  SELECT
    T1.Starting_areaId,
    T1.Ending_areaId,
    TUMBLE_END(T1.matchTime, INTERVAL '30' MINUTE) AS End_Time,
    COUNT(T1.Starting_areaId) AS No_Rides
  FROM (
    SELECT * FROM Rides
    MATCH_RECOGNIZE(
      PARTITION BY taxiId, rideId
      ORDER BY rideTime
      MEASURES
        toAreaId(P.lon, P.lat) AS Starting_areaId,
        toAreaId(D.lon, D.lat) AS Ending_areaId,
        MATCH_ROWTIME() AS matchTime
      AFTER MATCH SKIP PAST LAST ROW
      PATTERN(P D)
      DEFINE
        P AS P.isStart = true,
        D AS D.isStart = false
    )
  ) AS T1
  GROUP BY 
    T1.Starting_areaId,
    T1.Ending_areaId,
    TUMBLE(T1.matchTime, INTERVAL '30' MINUTE)
) AS T2

我面临这个错误:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.

标签: apache-flinkflink-streamingflink-sql

解决方案


推荐阅读