java - 迭代 GraphFrames AggregateMessages 达到内存限制
问题描述
我正在使用 GraphFrame 的 aggregateMessages 功能来构建自定义聚类算法。我在一个小样本数据集(约 100 个项目)上测试了这个算法,并验证了它的工作原理。但是,当我在包含 50k 个项目的真实数据集上运行此程序时,大约 10 次迭代后出现 OOM 错误。有趣的是,前几次迭代在几分钟内处理完毕,而 mem 是正常范围。在第 6 次迭代之后,mem 使用率攀升至 ~30GB 并最终爆炸。我在 32GB 的 2 节点集群 16 核上运行它。
由于这是一个迭代算法,而且每次迭代后的内存只会增加,我想知道我是否需要以某种方式释放内存。我在循环末尾添加了 unpersist 块,但这没有帮助。
还有其他我可以使用的效率吗?是否有在迭代设置中使用 GraphFrames 的最佳实践?
我注意到的另一件事是,在执行程序页面的 spark UI 上,使用的“存储内存”大约为 300MB,但 spark 进程实际上占用了大约 30GB。不确定这是否是内存泄漏!
while ( true ) {
System.out.println("["+new Date()+"] Running " + i);
Dataset<Row> lastRoutesDs = groups;
Dataset<Row> groupUnwind = groups.withColumn("id", explode(col("routeItems")));
GraphFrame gf = new GraphFrame(groupUnwind, edgesDs);
Dataset<Row> lvl1 = gf.aggregateMessages()
.sendToSrc(when(
callUDF("contains_in_array_str", AggregateMessages.dst().getField("routeItems"),
AggregateMessages.src().getField("id")).equalTo(false),
struct(AggregateMessages.dst().getField("routeItems").as("routeItems"),
AggregateMessages.dst().getField("routeScores").as("routeScores"),
AggregateMessages.dst().getField("grpId").as("grpId"),
AggregateMessages.dst().getField("grpScore").as("grpScore"),
AggregateMessages.edge().getField("score").as("edgeScore"))))
.agg(collect_set(AggregateMessages.msg()).as("incomings"))
.withColumn("inItem", explode(col("incomings")))
.groupBy("id", "inItem.grpId")
.agg(first("inItem.routeItems").as("routeItems"), first("inItem.routeScores").as("routeScores"),
first("inItem.grpScore").as("grpScore"), collect_list("inItem.edgeScore").as("inScores"))
.groupBy("grpId")
.agg(bestRouteAgg.apply(col("routeItems"), col("routeScores"), col("inScores"), col("grpScore"),
col("id"), col("grpScore")).as("best"))
.withColumn("newScore", callUDF("calcRouteScores", expr("size(best.routeItems)+1"),
col("best.routeScores"), col("best.inScores")))
.withColumn("edgeCount", expr("size(best.routeScores)"))
.persist(StorageLevel.MEMORY_AND_DISK());
lvl1
.filter("newScore > " + groupMaxScore)
.withColumn("itr", lit(i))
.select("grpId", "best.routeItems","best.routeScores", "best.grpScore", "edgeCount", "itr")
.write()
.mode(SaveMode.Append)
.json(workspaceDir + "clusters-rank-collect");
if (lvl1.count() == 0) {
System.out.println("****** End reached " + i);
break;
}
Dataset<Row> newGroups = lvl1.filter("newScore <= " + groupMaxScore)
.withColumn("routeItems_new",
callUDF("merge2Array", col("best.routeItems"), array(col("best.newNode"))))
.withColumn("routeScores_new",
callUDF("merge2ArrayDouble", col("best.routeScores"), col("best.inScores")))
.select(col("grpId"), col("routeItems_new").as("routeItems"),
col("routeScores_new").as("routeScores"), col("newScore").as("grpScore"));
if (i > 0 && (i % 2) == 0) {
newGroups = newGroups
.checkpoint();
}
newGroups = newGroups
.persist(StorageLevel.DISK_ONLY());
System.out.println( newGroups.count() );
groups.unpersist();
lastRoutesDs.unpersist();
groupUnwind.unpersist();
lvl1.unpersist();
groups = newGroups;
i++;
}
解决方案
推荐阅读
- c++ - 函数被多次调用
- c++ - 带有mat.at的opencv C++中的矩阵赋值错误
(i,j) - java - 检查文本文件中的项目是否为 JSON 格式 Android Studio
- typescript - TypeScript 可区分联合在切换后无法缩小类型
- python - 使用 lmfit 创建更精细的复合模型
- mysql - 无法将 MySql 查询(ManyToMany 关系)转换为 HQL
- python - SKlearn 导入错误
- php - 字段不接受默认值 mysql
- asp.net-mvc - 将数据作为参数从 MVC 应用程序传递给逻辑应用程序
- arrays - Postgres 字符串数组比较