首页 > 解决方案 > 派斯帕克。内存不足的问题。如何确保表被覆盖

问题描述

我目前尝试了解 Spark 计算的过程以及对内存消耗的影响。

我正在 Zeppelin 中使用 Spark 2.3.2 和 Python 2.7。

基本上在下面的循环中,我正在创建集合。我正在使用 sci-kit-learn 构建机器学习模型,并且在 sci-kit-learn 计算之后,我正在对 pyspark-dataframes 进行大量数据帧操作。对于每个我,我都会得到一个表 rsmeMaeStep,它有 8 行和 10 列,带有小字符串或双精度值。rsmeMaeAll 只是将单个分析加在一起,i=26 有 8*26 =208 行和 10 列。

for i in range(26):
    df_features_train, df_features_validation = randomizer(dataFiltered)
    rsmeMaeStep, rsmeMaeAll = rsmeMaeAnalysis(rsmeMaeAll,df_features_train,df_features_test)
    print(i)

我对代码做了一些时间分析。对于 i=1,对于 i=10 需要 17 秒:对于 i =26,需要 2:40 mnutes 需要 6:42。(即 10 或 26 个循环的 9.4 或 23.6 倍。)到目前为止,一切都符合预期。我在下一步有问题。以下代码应该只是对 8 到 206 行的简单聚合。对于 i=1 需要 32 秒,对于 i=7 4:43(长 8.8 倍),但对于 i=26 我在 47 分钟后有 0% 或者它因内存不足消息而失败。

rsmeMae = rsmeMaeAll.select('set','setting','sme').orderBy('set','setting')
import pyspark.sql.functions as f
rsmeMaeAverage = rsmeMae.groupBy('setting','set').agg(f.count(('setting')).alias('nrOfRand'), f.round(f.mean('sme'),2).alias('rsme'),f.round(f.stddev('sme'),2).alias('sigmaRsme')).orderBy('set','setting')
z.show(rsmeMaeAverage)

根据我认为应该在每个循环中覆盖所有表的逻辑。只有小的 rsmeMaeAll 应该在每个循环中增加一点。但它仍然是一张很小的桌子。

但 Spark 的行为可能有所不同。

据我了解,第一步的 sk-learn 代码在第一步中执行。如果我确实正确理解了 spark 延迟评估,那么当我想打印结果时,我的代码中的 pySpark 操作就会开始执行。因此,Spark 可能会将所有循环的所有表都保存在内存中。是对的吗?

如果我是对的,我需要代码在每个循环结束时直接计算 pySpark 代码。

我该怎么做?

如果我这样做会在下一个循环中触发覆盖表,或者内存消耗是否会随着每个循环而增加?我需要从内存中主动删除表吗?如何?

编辑:我刚刚整合

rsmeMaeStep.collect()
rsmeMaeAll.collect()

进入循环以确保立即完成 pyspark 计算。但是第一个循环需要 55 秒。第 7 次耗时 10 多分钟,49 分钟后在第 8 次循环的 rsmeMaeAll.collect() 处崩溃。带有错误消息:

Py4JJavaError:调用 o13488.collectToPython 时出错。:java.lang.OutOfMemoryError: Java 堆空间

我真的不明白每个循环时间的指数增长。在我至少能够运行 10 个循环之前。那里发生了什么?

标签: pythonapache-sparkmemorymachine-learningpyspark

解决方案


我认为这个问题与 Spark 中的惰性评估有关。由于我收集了所有信息,因此 pyspark 数据帧 rsmeMaeAll 可能在我尝试计算输出时,生成 rsmeMaeAll 所需的所有信息都已同时加载到缓存中。

基于这个想法,我以一种 Spark 不再需要保留所有步骤的方式重新构建代码。此外,我集成了时间测量,并在两个变体中重建旧代码,以使一个变体更接近新逻辑和每个变体,计算必须在每个循环结束时完成。

解决方案如下:

for i in range(9):
    ti0 = time.time()
    df_features_train, df_features_test = randomizer(dataFiltered)
    rsmeMaeStep = rsmeMaeAnalysis(df_features_train,df_features_test)
    rsmeMaeAllpd = rsmeMaeAllpd.append(rsmeMaeStep.toPandas())
    print(rsmeMaeAllpd)
    ti1 = time.time()
    print "Time for loop", i, ":", ti1-ti0

在 rsmeMaeAnalysis 中,我刚刚计算了分析结果,返回它们,将它们转换为 Pandas 数据框,并在 Pandas 中收集所有结果。结果是每个循环都或多或少地花费了相同的时间,即使在 20 个循环之后,我也没有任何记忆问题。前十个循环的时间如下:

41s ,42s ,44s ,40s ,43s ,43s ,40s ,39s ,40s ,40s

但是后来我想确定在 pyspark 数据框中收集结果确实是问题所在,因此我构建了一个尽可能接近熊猫解决方案的代码,但将结果收集在 pyspark 数据框中:

for i in range(10):
    ti0 = time.time()
    df_features_train, df_features_test = randomizer(dataFiltered)
    rsmeMaeStep = rsmeMaeAnalysis(df_features_train,df_features_test)
    rsmeMaeAll = rsmeMaeAll.union(rsmeMaeStep)
    rsmeMaeAll.show(80,False)
    ti1 = time.time()
    print "Time for loop", i, ":", ti1-ti0

前八个循环的时间如下:

43s ,63s ,88s ,144s ,162s ,175s ,212s ,276s

在仅使用时间测量的原始变体中,在第 7 次循环后出现内存不足错误需要以下时间:

44s ,60s ,73s ,98s ,128s ,157s ,198s

最后,延迟评估似乎仍然导致生成 rsmeMaeAll 所需的大量信息同时加载到缓存中,尽管大多数信息在每个循环结束时都不相关。


推荐阅读