首页 > 解决方案 > Spark sql 无法覆盖大型镶木地板分区

问题描述

每次有更新整个分区的请求时,我都会用 apache spark 2.2 覆盖现有的 parquet 分区。数据有偏差。所以有些分区很大。最初,我不重新分区数据帧(从源数据创建)并尝试使用以下伪 spark sql 编写它。

insert overwrite table mytable partition(a=123, b=234) select c1, c2, c3 from mydataframe where a=123 and b=234

(我本可以使用动态分区 sql update,但它并没有改变任何关于该问题的内容。)

现在,因为这个分区有大量数据(超过 5g)我得到java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 它似乎表明块大小超过了 2g 火花限制。建议的常见解决方案是增加数据框的分区数量。

如果我增加数据框中的分区数,那么我会得到

CommitDeniedException: attempt Not committed because the driver did not authorize commit

TaskCommitDenied (Driver denied task commit) for job: 124, partition: 39, attemptNumber: 0

以下是我在大多数执行程序日志中看到的异常顺序。Shuffle 失败,然后是 CommitDeniedException,然后是 OutOfMemory 直到它死去的地方

19/03/15 12:42:52 ERROR shuffle.OneForOneBlockFetcher: Failed while starting block fetches

19/03/15 13:10:22 ERROR util.Utils: Aborting task
org.apache.spark.executor.CommitDeniedException: attempt_20190315131022_0085_m_000042_360: Not committed because the driver did not authorize commit
        at org.apache.spark.mapred.SparkH

19/03/15 13:34:48 ERROR util.Utils: Uncaught exception in thread driver-heartbeater
java.lang.OutOfMemoryError: Java heap space

我不知道为什么我会得到TaskCommitDenied. spark.speculation默认情况下禁用。我认为进行数据帧重新分区可能会导致此问题。如果我错了,请纠正我 - 因为我正在将数据覆盖到物理镶木地板分区,所以数据帧分区计数也必须相同,否则多个任务可能会尝试同时覆盖它。如果您的数据框有更多的分区然后物理分区那么这可能会发生。

那么如何使用 spark-sql 覆盖巨大的镶木地板分区呢?

额外的 Stacktrace(来自一个执行程序的片段,它发生在多个节点上):

19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 10.443414 ms
19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 29.516827 ms
19/03/16 15:39:08 INFO codegen.CodeGenerator: Code generated in 51.646654 ms
19/03/16 15:39:08 INFO parquet.MapredParquetOutputFormat: creating new record writer...org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat@4c0e1ce4
19/03/16 15:39:08 INFO write.ParquetRecordWriterWrapper: initialize serde with table properties.
19/03/16 15:39:08 INFO write.ParquetRecordWriterWrapper: creating real writer to write at maprfs:/mdsMyProject/hive/warehouse/MyProjectint4242.db/table_12341factTable/.hive-staging_hive_2019-03-16_14-57-17_337_1173214459531843754-3/-ext-10000/_temporary/0/_temporary/attempt_20190316153908_0020_m_000121_0/business_id=1224/period_id=684039130/part-00121-b2416c44-6e2c-4f27-993e-0328fd64ea2e.c000
19/03/16 15:39:09 INFO write.ParquetRecordWriterWrapper: real writer: parquet.hadoop.ParquetRecordWriter@5659d6bb
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
19/03/16 15:42:36 INFO mapred.SparkHadoopMapRedUtil: attempt_20190316153908_0020_m_000121_0: Not committed because the driver did not authorize commit
19/03/16 15:42:36 ERROR util.Utils: Aborting task
org.apache.spark.executor.CommitDeniedException: attempt_20190316153908_0020_m_000121_0: Not committed because the driver did not authorize commit
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:83)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:171)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:261)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:262)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
19/03/16 15:42:36 WARN output.FileOutputCommitter: Could not delete maprfs:/mdsMyProject/hive/warehouse/MyProjectint4242.db/table_12341factTable/.hive-staging_hive_2019-03-16_14-57-17_337_1173214459531843754-3/-ext-10000/_temporary/0/_temporary/attempt_20190316153908_0020_m_000121_0
19/03/16 15:42:36 ERROR datasources.FileFormatWriter: Job job_20190316153908_0020 aborted.
19/03/16 15:52:03 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1312
19/03/16 15:52:03 INFO executor.Executor: Running task 7.0 in stage 19.4 (TID 1312)
19/03/16 15:52:03 INFO spark.MapOutputTrackerWorker: Updating epoch to 42 and clearing cache
19/03/16 15:52:03 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 40
19/03/16 15:52:03 INFO client.TransportClientFactory: Successfully created connection to /10.250.70.17:33652 after 2 ms (0 ms spent in bootstraps)
19/03/16 15:52:03 INFO memory.MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 17.1 KB, free 11.0 GB)







19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 2, fetching them
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@10.250.70.17:32803)
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Don't have map outputs for shuffle 2, fetching them
19/03/16 16:04:38 INFO spark.MapOutputTrackerWorker: Got the output locations
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 32 blocks
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 32 blocks
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 9 ms
19/03/16 16:04:38 INFO storage.ShuffleBlockFetcherIterator: Started 1 remote fetches in 14 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 153.597538 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 21.508826 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 13.097823 ms
19/03/16 16:05:18 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:18 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/03/16 16:05:18 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:19 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:19 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:20 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:20 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:21 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:22 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:23 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:23 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:24 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:25 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:25 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:26 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:27 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:28 INFO util.ShutdownHookManager: Shutdown hook called
19/03/16 16:05:29 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:30 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
19/03/16 16:05:30 INFO util.ShutdownHookManager: Deleting directory /tmp/hadoop-myCompany/nm-local-dir/usercache/myCompany/appcache/application_1552538107854_5893/spark-fc8ff151-20b7-46b2-b1d6-7cc047f1b495
19/03/16 16:05:31 WARN memory.TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
^@^Fstdout^@^C124#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 18350"...





Sometime I see more stack trace when OOM happens


19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 19.177788 ms
19/03/16 16:04:38 INFO codegen.CodeGenerator: Code generated in 12.840521 ms
19/03/16 16:05:15 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 10.0 in stage 20.2 (TID 1331)
java.lang.OutOfMemoryError: Java heap space
        at java.io.FileInputStream.close(FileInputStream.java:326)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at org.apache.spark.network.util.LimitedInputStream.close(LimitedInputStream.java:125)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:438)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
19/03/16 16:05:15 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 24.0 in stage 20.2 (TID 1346)
19/03/16 16:05:15 INFO storage.DiskBlockManager: Shutdown hook called
19/03/16 16:05:15 ERROR executor.Executor: Exception in task 24.0 in stage 20.2 (TID 1346)
java.lang.OutOfMemoryError: Java heap space
        at java.lang.Integer.valueOf(Integer.java:832)
        at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:421)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
19/03/16 16:05:15 INFO executor.Executor: Not reporting error to driver during JVM shutdown.
19/03/16 16:05:15 INFO executor.Executor: Not reporting error to driver during JVM shutdown.
19/03/16 16:05:15 ERROR util.SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 1331,5,main]
19/03/16 16:05:15 ERROR util.SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 1346,5,main]
java.lang.OutOfMemoryError: Java heap space
        at java.lang.Integer.valueOf(Integer.java:832)
        at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
        at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
        at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
        at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:421)
        at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:61)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
        at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
19/03/16 16:05:16 INFO util.ShutdownHookManager: Shutdown hook called
19/03/16 16:05:16 INFO util.ShutdownHookManager: Deleting directory /tmp/hadoop-myCompany/nm-local-dir/usercache/myCompany/appcache/application_1552538107854_5893/spark-0d69a896-fbdf-4627-bc0b-7384c9421fa0
^@^Fstdout^@^C122#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 930"...

标签: apache-sparkapache-spark-sqlparquet

解决方案


推荐阅读