首页 > 解决方案 > Scala Spark/Databricks:.cache() 不会阻止重新计算

问题描述

这涉及一些复杂性,我可能不清楚一些基础知识。开始:

据我了解,火花有“转换”和“行动”。转换会懒惰地建立对您想要做的事情的描述,然后采取行动使其发生。这可以提高性能(允许优化计划),或者如果您在单个数据帧上使用多个操作,可能会导致重复工作,从而导致转换重复触发。为了避免这种情况, .cache() 告诉 Spark 实际上“保存它的工作”,所以你调用它的数据帧不应该继续被重新计算。

我的问题是它似乎没有这样做。我有一个函数“Foo”,它进行大量计算以产生一个(非常小的)数据帧。Foo 跑得很快,我可以显示结果。我有另一个函数“Bar”,它对数据框执行一系列操作。Bar 在(大)原始输入上运行得很快,但在 foo 的输出上运行得非常慢,甚至缓存和合并。我还可以通过将 foo 的输出写入磁盘然后重新读取它来“强制”缓存,此时 bar 会快速运行:

display(bar(bigDF)) //Fast!

val profile = foo(bigDF).coalesce(1).cache()
display(profile) //Also fast! (and profile only has 2 rows, ~80 columns)

display(bar(profile)) //Slow!

profile
  .write.format("com.databricks.spark.csv")
  .option("header", "true")
  .save("filename.csv")
val dumb = spark.read.format("csv").option("header", "true").load("filename.csv")
display(bar(dumb)) //Fast again

对我来说,这说明 .cache() 没有像我认为的那样工作 - 缓慢的调用反复重新调用 foo 中的转换,除非我将它写入磁盘并强制它“忘记”它的历史. 有人可以解释我错过了什么吗?

标签: scalaapache-sparkdatabricks

解决方案


cache正在做你所期望的,似乎发生了一些奇怪的事情。

我希望这coalesce(1)是问题所在,请尝试将其保留并测试它是否运行得更快。可能是它破坏了bar.

如果没有任何帮助,请尝试使用它来checkpoint代替cache它可能是查询计划非常长且复杂,checkpoint会截断它(它写入磁盘)

为了进一步分析,您需要进入 SparkUI 来分析作业


推荐阅读