首页 > 解决方案 > 使用大量窗口函数(滞后、领先)导致内存不足

问题描述

我需要使用多个领先和滞后从数据集中计算其他特征。大量的超前和滞后会导致内存不足错误。

数据框:

|----------+----------------+---------+---------+-----+---------|
| DeviceID | Timestamp      | Sensor1 | Sensor2 | ... | Sensor9 |
|----------+----------------+---------+---------+-----+---------|
|          |                |         |         |     |         |
| Long     | Unix timestamp | Double  | Double  |     | Double  |
|          |                |         |         |     |         |
|----------+----------------+---------+---------+-----+---------|

窗口定义:

// Each window contains about 600 rows
val w = Window.partitionBy("DeviceID").orderBy("Timestamp") 

计算额外特征:

var res = df
val sensors = (1 to 9).map(i => s"Sensor$i")

for (i <- 1 to 5) {
  for (s <- sensors) {
    res = res.withColumn(lag(s, i).over(w))
         .withColumn(lead(s, i)).over(w)
  }

  // Compute features from all the lag's and lead's
  [...]
}

系统信息:

RAM: 16G
JVM heap: 11G

该代码在小数据集上给出了正确的结果,但在输入数据为 10GB 时给出了内存不足的错误。我认为罪魁祸首是大量的窗口函数,因为 DAG 显示了一个很长的序列

Window -> WholeStageCodeGen -> Window -> WholeStageCodeGen ...

反正有没有以更有效的方式计算相同的特征?例如,是否可以在不调用 lag(..., 1) 九次的情况下得到 lag(Sensor1, 1), lag(Sensor2, 1), ..., lag(Sensor9, 1)?

如果上一个问题的答案是否定的,那么如何避免内存不足?我已经尝试过增加分区的数量。

标签: apache-sparkwindow-functions

解决方案


你可以尝试类似的东西

res = res.select('*', lag(s"Sensor$1", 1).over(w), lag(s"Sensor$1", 2).over(w), ...)

也就是说,将所有内容都写成 aselect而不是 manywithColumn

那么计划中将只有 1 个窗口。也许它有助于性能。


推荐阅读