scala - Scala Spark/Databricks:.cache() 不会阻止重新计算
问题描述
这涉及一些复杂性,我可能不清楚一些基础知识。开始:
据我了解,火花有“转换”和“行动”。转换会懒惰地建立对您想要做的事情的描述,然后采取行动使其发生。这可以提高性能(允许优化计划),或者如果您在单个数据帧上使用多个操作,可能会导致重复工作,从而导致转换重复触发。为了避免这种情况, .cache() 告诉 Spark 实际上“保存它的工作”,所以你调用它的数据帧不应该继续被重新计算。
我的问题是它似乎没有这样做。我有一个函数“Foo”,它进行大量计算以产生一个(非常小的)数据帧。Foo 跑得很快,我可以显示结果。我有另一个函数“Bar”,它对数据框执行一系列操作。Bar 在(大)原始输入上运行得很快,但在 foo 的输出上运行得非常慢,甚至缓存和合并。我还可以通过将 foo 的输出写入磁盘然后重新读取它来“强制”缓存,此时 bar 会快速运行:
display(bar(bigDF)) //Fast!
val profile = foo(bigDF).coalesce(1).cache()
display(profile) //Also fast! (and profile only has 2 rows, ~80 columns)
display(bar(profile)) //Slow!
profile
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("filename.csv")
val dumb = spark.read.format("csv").option("header", "true").load("filename.csv")
display(bar(dumb)) //Fast again
对我来说,这说明 .cache() 没有像我认为的那样工作 - 缓慢的调用反复重新调用 foo 中的转换,除非我将它写入磁盘并强制它“忘记”它的历史. 有人可以解释我错过了什么吗?
解决方案
cache
正在做你所期望的,似乎发生了一些奇怪的事情。
我希望这coalesce(1)
是问题所在,请尝试将其保留并测试它是否运行得更快。可能是它破坏了bar
.
如果没有任何帮助,请尝试使用它来checkpoint
代替cache
它可能是查询计划非常长且复杂,checkpoint
会截断它(它写入磁盘)
为了进一步分析,您需要进入 SparkUI 来分析作业
推荐阅读
- android - 为在模拟器上运行的系统 Android 应用程序制作自定义设置向导
- node.js - 在 keycloak 中,如何仅使用状态代码进行响应而不是重定向到登录页面
- reactjs - React hooks - 如何在检查状态和道具的功能组件中实现 shoulComponentUpdate 方法?
- mysql - 根据共享的外国 ID 选择相同的项目
- java - 如何使用嵌套的 json 数组反序列化 json 对象
- c# - 无法在 Web 服务 wcf 中发送或接收大尺寸数组
- java - 我在哪里可以在 Eclipse 中找到这个小部件?
- linux - 非法选项 -p /bin/sh: 1: Usage:: not found
- java - Gigya SDK v4 Facebook 登录问题
- c# - 如何使用功能区上的按钮在资源管理器中对电子邮件进行绿色检查?