apache-spark - 在数据帧上使用 head(1) 导致 spark 超时异常
问题描述
我正在运行一个简单的 spark-scala 代码:-
val df=spark.read.json("/home/files/data/date_20200811.json")
df.persist
if(!df.head(1).isEmpty){
val validDF=df.where("status=OLD")
validDF.write.json("/home/files/result")
}
else{
println("No data found")
}
当我运行此代码时,它给了我一个例外:-
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:100)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:89)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1596)
但是,如果我将df.head(1).isEmpty 替换为df.count>0。它完美地工作
解决方案
可能是巧合,您确定这段代码是错误背后的罪魁祸首吗?我认为缺少一些东西。
请阅读您的错误的第 7 行stacktrace
,它是at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
。
这意味着在某个地方广播了一个数据帧以进行连接。而且这个广播没有在默认的 300 秒内完成spark.sql.broadcastTimeout
。
推荐阅读
- woocommerce - Woo Commerce 有条件税取决于购物车中的产品
- python - 我无法使用 django 链接 html 文件中的 CSS 文件
- python - 如何为一个班级收集所有 __slots__
- javascript - 我正在使用纸质 js 进行 patatap 克隆,但一直出错
- css - 未选择时如何提示消息?
- laravel - Laravel 对查询的结果进行分组并将它们分组显示在 Form :: select 中
- android - Android - 运行时“java.lang.IllegalArgumentException:无效的主题名称:与允许的格式不匹配”
- c# - ASP.NET Core API 控制器 - 如何正确调试
- docusignapi - 为签名组创建选项卡会导致重复
- java - 如何逐行读取文件并存储特定值