apache-spark - 使用大量窗口函数(滞后、领先)导致内存不足
问题描述
我需要使用多个领先和滞后从数据集中计算其他特征。大量的超前和滞后会导致内存不足错误。
数据框:
|----------+----------------+---------+---------+-----+---------|
| DeviceID | Timestamp | Sensor1 | Sensor2 | ... | Sensor9 |
|----------+----------------+---------+---------+-----+---------|
| | | | | | |
| Long | Unix timestamp | Double | Double | | Double |
| | | | | | |
|----------+----------------+---------+---------+-----+---------|
窗口定义:
// Each window contains about 600 rows
val w = Window.partitionBy("DeviceID").orderBy("Timestamp")
计算额外特征:
var res = df
val sensors = (1 to 9).map(i => s"Sensor$i")
for (i <- 1 to 5) {
for (s <- sensors) {
res = res.withColumn(lag(s, i).over(w))
.withColumn(lead(s, i)).over(w)
}
// Compute features from all the lag's and lead's
[...]
}
系统信息:
RAM: 16G
JVM heap: 11G
该代码在小数据集上给出了正确的结果,但在输入数据为 10GB 时给出了内存不足的错误。我认为罪魁祸首是大量的窗口函数,因为 DAG 显示了一个很长的序列
Window -> WholeStageCodeGen -> Window -> WholeStageCodeGen ...
反正有没有以更有效的方式计算相同的特征?例如,是否可以在不调用 lag(..., 1) 九次的情况下得到 lag(Sensor1, 1), lag(Sensor2, 1), ..., lag(Sensor9, 1)?
如果上一个问题的答案是否定的,那么如何避免内存不足?我已经尝试过增加分区的数量。
解决方案
你可以尝试类似的东西
res = res.select('*', lag(s"Sensor$1", 1).over(w), lag(s"Sensor$1", 2).over(w), ...)
也就是说,将所有内容都写成 aselect
而不是 manywithColumn
那么计划中将只有 1 个窗口。也许它有助于性能。
推荐阅读
- mongodb - 如何将 Mongo DB 查询转换为 Mongoengine
- java - 如何在java中更改操作系统时区?
- docker - Dockerfile 用于创建一个图像以在 jenkins 中用作容器代理
- linux - Docker 19.03 中是否有用于访问 docker buildx 的 HTTP api?
- ios - 为什么我的 IBAction 出现“无法识别的选择器”错误?
- c# - 尝试读取 excel 但如果用户删除行会得到 System.ArgumentOutOfRangeException
- sql - 获取sql server中proc所有输出的行数
- selenium - 从不同的包调用java类
- jquery - 发送 csv 文件熔断节点
- java - 使用JPQL JpaRepository从两个表父表ID一对多关系中检索数据