首页 > 解决方案 > 为什么增加 spark.sql.shuffle.partitions 会导致 FetchFailedException

问题描述

在设置 spark.sql.shuffle.partitions = 2700 时加入表时遇到 FetchFailedException

但是在设置 spark.sql.shuffle.partitions = 500 时运行成功。

据我所知,在随机读取时,增加 shuffle.partitions 会减少每个任务中的数据。

我错过了什么吗?

例外

FetchFailed(BlockManagerId(699, nfjd-hadoop02-node120.jpushoa.com, 7337, None), shuffleId=4, mapId=59, reduceId=1140, message=
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 2147483648)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCode

配置

spark.executor.cores = 1
spark.dynamicAllocation.maxExecutors = 800

标签: apache-sparkapache-spark-sql

解决方案


在阅读了 shuffleFetch 的代码之后。

我遇到的问题是来自 ShuffleMapTask 的真正块太大而无法一次提取到内存中,并且来自驱动程序的块大小是平均块大小如果我的 shuffle 分区超过 2000(根据 spark.shuffle.minNumPartitionsToHighlyCompress )会更小然后是倾斜数据时的实际大小。


推荐阅读