apache-spark - 如何调整 PySpark 以优化处理大型编号。的分区?
问题描述
在下面的 Spark 代码中,我正在读取大约 300,000 个 HDFS 文件(总共 2.4 TB),运行 combineByKey() 操作,然后将数据保存在单独的 HDFS 目录中。非常简单的操作,除了大量的分区。
sc.textFile(input_path + "*"). \
map(lambda v: (v[0], v[1])). \
combineByKey(to_list, append, extend). \
map(lambda v: json.dumps(v)). \
saveAsTextFile(output_path)
问题是在 combineByKey 阶段完成后,代码通过生成以下堆栈跟踪(我不知道)而停止。我通过读取示例子集(〜50000)文件运行相同的代码,它没有任何错误。所以,我认为这将是因为高没有。任务,因此,我在 combineByKey() 之后加入了一个 coalesce() 函数来减少数量。的分区。这也导致了同样的错误。知道如何从中恢复吗?
Caused by: java.io.IOException: Read error or truncated source
at com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:154)
at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781)
at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:3014)
at java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:3095)
at java.io.DataInputStream.readShort(DataInputStream.java:313)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3276)
at java.io.ObjectInputStream.readShort(ObjectInputStream.java:1082)
at org.roaringbitmap.RoaringArray.deserialize(RoaringArray.java:343)
at org.roaringbitmap.RoaringArray.readExternal(RoaringArray.java:818)
at org.roaringbitmap.RoaringBitmap.readExternal(RoaringBitmap.java:2134)
at org.apache.spark.scheduler.HighlyCompressedMapStatus.$anonfun$readExternal$2(MapStatus.scala:210)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1343)
at org.apache.spark.scheduler.HighlyCompressedMapStatus.readExternal(MapStatus.scala:207)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2236)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$1(MapOutputTracker.scala:956)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:958)
at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:972)
at org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851)
at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808)
at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Traceback (most recent call last):
File "xxx.py", line 450, in <module>
step5()
File "xxx.py", line 387, in step5
sc.textFile(input_path + "*"). \
File "/usr/local/hadoop/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1656, in saveAsTextFile
File "/usr/local/hadoop/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/local/hadoop/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
File "/usr/local/hadoop/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o71.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
解决方案
推荐阅读
- sql - PostgreSQL 更新并返回
- c - 为什么当'->head'指向另一个结构时,我必须指向结构/联合?
- python - 为什么斯坦福的 cs231n SVM 中的点积是倒退的?
- ios - 无法解析数组 JSON 解码器 Swift 4
- c++ - EXE 或 DLL 是否拥有移动的内存?
- javascript - 在不同的 TypeScript 类之间共享和改变对象
- cakephp-3.0 - 如何从另一个控制器添加到表 Cakephp 3
- python - sklearn中留一法交叉验证的混淆矩阵
- python - 在没有内置函数的情况下将字符串转换为全部小写
- python - Python动画:数据量大,hist动画慢