首页 > 解决方案 > org.apache.spark.SparkException:任务不可序列化——Scala

问题描述

我正在阅读一个文本文件,它是固定宽度的文件,我需要将其转换为 csv。我的程序在本地机器上运行良好,但是当我在集群上运行它时,它会抛出“任务不可序列化”异常。

我试图用 map 和 mapPartition 解决同样的问题。

在 RDD 上使用 toLocalIterator 可以正常工作。但它不适用于大文件(我有 8GB 的​​文件)

下面是使用我最近尝试过的 mapPartition 的代码

//读取源文件并创建RDD

def main(){
    var inpData = sc.textFile(s3File)
    LOG.info(s"\n inpData >>>>>>>>>>>>>>> [${inpData.count()}]")

    val rowRDD = inpData.mapPartitions(iter=>{
    var listOfRow = new ListBuffer[Row]
    while(iter.hasNext){
       var line = iter.next()
       if(line.length() >= maxIndex){
          listOfRow += getRow(line,indexList)
        }else{
          counter+=1
        }
     }
    listOfRow.toIterator
    })

    rowRDD .foreach(println)
}

case class StartEnd(startingPosition: Int, endingPosition: Int) extends Serializable

def getRow(x: String, inst: List[StartEnd]): Row = {
    val columnArray = new Array[String](inst.size)
    for (f <- 0 to inst.size - 1) {
      columnArray(f) = x.substring(inst(f).startingPosition, inst(f).endingPosition)
    }
    Row.fromSeq(columnArray)
}

//注意:供您参考,我使用 StartEnd 案例类创建的 indexList,创建后如下所示

[List(StartEnd(0,4), StartEnd(4,10), StartEnd(7,12), StartEnd(10,14))]

这个程序在我的本地机器上运行良好。但是当我安装集群(AWS)时,它会抛出异常,如下所示。

>>>>>>>>Map(ResultantDF -> [], ExceptionString -> 
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable
Exception occurred while applying the FileConversion transformation and the exception Message is :Task not serializable)
[Driver] TRACE reflection.ReflectionInvoker$.invokeDTMethod - Exit

我无法理解这里出了什么问题,什么是不可序列化的,为什么它会抛出异常。

任何帮助表示赞赏。提前致谢!

标签: scalaapache-sparkapache-spark-sql

解决方案


getRow您在 SparkmapPartition转换中调用方法。它迫使 spark 将主类的实例传递给工人。主类包含LOG一个字段。似乎此日志对序列化不友好。你可以

a)转移getRowLOG不同的object(解决此类问题的一般方法)

b) 使 LOG alazy val

c) 使用另一个日志库


推荐阅读