首页 > 解决方案 > Spark SQL 窗口超过两个指定时间边界之间的间隔 - 3 小时到 2 小时前

问题描述

使用两个预定义边界在 Spark SQL 中指定窗口间隔的正确方法是什么?

我试图在“3 小时前到 2 小时前”的窗口中汇总表中的值。

当我运行此查询时:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;

这样可行。我得到了我期望的结果,即落入 2 小时滚动窗口的值的总和。

现在,我需要的是让滚动窗口不绑定到当前行,而是考虑 3 小时前和 2 小时前之间的行。我试过:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;

但我得到extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'}错误。

我也尝试过:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;

但后来我得到不同的错误scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)

我尝试的第三个选项是:

select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;

它并没有像我们预期的那样工作:cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch

我很难找到间隔类型的文档,因为这个链接说得不够多,而且其他信息有点半生不熟。至少我发现了什么。

标签: apache-sparkapache-spark-sqlwindow-functions

解决方案


由于范围间隔不起作用,我不得不转向另一种方法。它是这样的:

  • 准备需要执行计算的区间列表
  • 对于每个间隔,运行计算
    • 这些迭代中的每一个都会产生一个数据框
  • 迭代后,我们有一个数据框列表
  • 将列表中的数据框合并为一个更大的数据框
  • 写出结果

在我的例子中,我必须在一天中的每个小时运行计算并将这些“每小时”结果(即 24 个数据帧的列表)组合成一个“每日”数据帧。

从非常高级的角度来看,代码如下所示:

val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield {
    val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart))
    // do stuff
    // return a data frame
}
hourlyDFs.toSeq().reduce(_.union(_))

推荐阅读