首页 > 解决方案 > 如何使用 Flink 找到 30 分钟内销售量大于 1000 的商品?

问题描述

假设数据包含三个字段:itemID、timeStamp、totalQuantityOfSale。

每 5 分钟我要报告前 30 分钟内销售量大于 1000 件的商品。

我想做的是

DataStream<String> process =  stream
                .keyBy(Item::getId)
                .timeWindow(Time.minutes(30), Time.minutes(5))
                .process(new MyProcessFunction());

在这个 MyProcessFunction() 中,我正在考虑存储最后一个数据的 timeStamp 和 totalQuantityOfSale。所以

if(currentData.getTimeStamp()>lastTimeStamp){
    sum+=(currentData.getQuantityOfSale-lastTotalQuantityOfSale);
}

但是,这个 MyProcessFunction 应该扩展需要大量内存的 ProcessWindowFunction。而且我也不知道这种方法是否正确。那么谁能告诉我该怎么做?还有其他更好的解决方案吗?非常感谢!

标签: javaapache-flinkflink-streaming

解决方案


您正在寻找的似乎可以在 Flink SQL 中表示为

SELECT
  id, window_start, window_end, sum(quantityOfSale) AS totalQuantityOfSale
FROM TABLE(
  HOP(TABLE events, DESCRIPTOR(timeStamp), INTERVAL '5' MINUTES, INTERVAL '30' MINUTES))
GROUP BY
  id, window_start, window_end
HAVING sum(quantityOfSale) > 1000;

推荐阅读