apache-spark - Spark:Py4JJavaError: 调用 o142.saveAsTextFile 时出错
问题描述
当我保存一对 rdd byrdd.repartition(1).saveAsTextFile(file_path)
时,会遇到错误。
Py4JJavaError: An error occurred while calling o142.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550)
at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
process()
File "/home/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
return func(split, prev_func(split, iterator))
File "/home/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
return func(split, prev_func(split, iterator))
File "/home/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
return func(split, prev_func(split, iterator))
[Previous line repeated 2 more times]
File "/home/spark/python/pyspark/rdd.py", line 350, in func
return f(iterator)
File "/home/spark/python/pyspark/rdd.py", line 1951, in groupByKey
merger.mergeCombiners(it)
File "/home/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 288, in mergeCombiners
self._spill()
File "/home/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 735, in _spill
self.serializer.dump_stream([(k, self.data[k])], streams[h])
File "/home/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 331, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/home/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 383, in dump_stream
bytes = self.serializer.dumps(vs)
File "/home/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 634, in dumps
return zlib.compress(self.serializer.dumps(obj), 1)
File "/home/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 562, in dumps
return pickle.dumps(obj, protocol)
MemoryError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
... 41 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/spark/python/lib/pyspark.zip/pyspark/worker.py", line 253, in main
process()
File "/home/spark/python/lib/pyspark.zip/pyspark/worker.py", line 248, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
return func(split, prev_func(split, iterator))
File "/home/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
return func(split, prev_func(split, iterator))
File "/home/spark/python/pyspark/rdd.py", line 2440, in pipeline_func
return func(split, prev_func(split, iterator))
[Previous line repeated 2 more times]
File "/home/spark/python/pyspark/rdd.py", line 350, in func
return f(iterator)
File "/home/spark/python/pyspark/rdd.py", line 1951, in groupByKey
merger.mergeCombiners(it)
File "/home/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 288, in mergeCombiners
self._spill()
File "/home/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 735, in _spill
self.serializer.dump_stream([(k, self.data[k])], streams[h])
File "/home/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 331, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/home/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 383, in dump_stream
bytes = self.serializer.dumps(vs)
File "/home/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 634, in dumps
return zlib.compress(self.serializer.dumps(obj), 1)
File "/home/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 562, in dumps
return pickle.dumps(obj, protocol)
MemoryError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:330)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:470)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
解决方案
Repartion 方法是触发 full shuffle,如果数据集很大,驱动程序会导致内存错误。尝试增加 repartitton 中的分区 vaue 的数量。
推荐阅读
- android - 在 Android 10 中获取蓝牙热点设备 IP
- entity-framework-core - Azure Pipelines 中的 SQL 迁移与事务
- reactjs - React js,输入类型onclick
- java - JavaFX Maven jfx:jar 错误:找不到工件 javafx-packager
- javascript - 生成 PDF 并返回响应
- filter - 在卷积过程中翻转过滤器背后的直觉是什么?
- python - 更改通过 pybind11_add_module 创建的库的输出目录
- tsql - PLSQL 到 TSQL - REGEXP
- arduino - 将.bin文件上传到esp8266的问题
- arrays - 如何在 Spark 中将文件名作为列传递