首页 > 解决方案 > 如何调整 PySpark 以优化处理大型编号。的分区?

问题描述

在下面的 Spark 代码中,我正在读取大约 300,000 个 HDFS 文件(总共 2.4 TB),运行 combineByKey() 操作,然后将数据保存在单独的 HDFS 目录中。非常简单的操作,除了大量的分区。

sc.textFile(input_path + "*"). \
        map(lambda v: (v[0], v[1])). \
        combineByKey(to_list, append, extend). \
        map(lambda v: json.dumps(v)). \
        saveAsTextFile(output_path)

问题是在 combineByKey 阶段完成后,代码通过生成以下堆栈跟踪(我不知道)而停止。我通过读取示例子集(〜50000)文件运行相同的代码,它没有任何错误。所以,我认为这将是因为高没有。任务,因此,我在 combineByKey() 之后加入了一个 coalesce() 函数来减少数量。的分区。这也导致了同样的错误。知道如何从中恢复吗?

Caused by: java.io.IOException: Read error or truncated source
    at com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:154)
    at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781)
    at java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:3014)
    at java.io.ObjectInputStream$BlockDataInputStream.read(ObjectInputStream.java:3095)
    at java.io.DataInputStream.readShort(DataInputStream.java:313)
    at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3276)
    at java.io.ObjectInputStream.readShort(ObjectInputStream.java:1082)
    at org.roaringbitmap.RoaringArray.deserialize(RoaringArray.java:343)
    at org.roaringbitmap.RoaringArray.readExternal(RoaringArray.java:818)
    at org.roaringbitmap.RoaringBitmap.readExternal(RoaringBitmap.java:2134)
    at org.apache.spark.scheduler.HighlyCompressedMapStatus.$anonfun$readExternal$2(MapStatus.scala:210)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1343)
    at org.apache.spark.scheduler.HighlyCompressedMapStatus.readExternal(MapStatus.scala:207)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2236)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2185)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.spark.MapOutputTracker$.$anonfun$deserializeMapStatuses$1(MapOutputTracker.scala:956)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:958)
    at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:972)
    at org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856)
    at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
    at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851)
    at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808)
    at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Traceback (most recent call last):
  File "xxx.py", line 450, in <module>
    step5()
  File "xxx.py", line 387, in step5
    sc.textFile(input_path + "*"). \
  File "/usr/local/hadoop/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1656, in saveAsTextFile
  File "/usr/local/hadoop/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/local/hadoop/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
  File "/usr/local/hadoop/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o71.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.

标签: apache-sparkpyspark

解决方案


推荐阅读