首页 > 解决方案 > 在数据帧上使用 head(1) 导致 spark 超时异常

问题描述

我正在运行一个简单的 spark-scala 代码:-

val df=spark.read.json("/home/files/data/date_20200811.json")
df.persist
if(!df.head(1).isEmpty){
val validDF=df.where("status=OLD")
validDF.write.json("/home/files/result")
}
else{
println("No data found")
}

当我运行此代码时,它给了我一个例外:-

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:100)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:89)
        at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
        at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
        at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
        at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
        at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1596)

但是,如果我将df.head(1).isEmpty 替换为df.count>0。它完美地工作

标签: apache-sparkpysparkapache-spark-sql

解决方案


可能是巧合,您确定这段代码是错误背后的罪魁祸首吗?我认为缺少一些东西。

请阅读您的错误的第 7 行stacktrace,它是at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)

这意味着在某个地方广播了一个数据帧以进行连接。而且这个广播没有在默认的 300 秒内完成spark.sql.broadcastTimeout


推荐阅读