首页 > 解决方案 > 从 Serializable Scala 对象调用 Hadoop FileSystem 操作的最佳方法是什么

问题描述

/我在尝试什么/

我想对包含 BZ2 文件的几个 HDFS 存储桶进行一些 Spark UDF 转换。我已经定义了一个 MyMain Scala 对象,extends Serializable因为它涉及在每个 HDFS 存储桶上调用 UDF 转换。

但是,在进行 UDF 转换之前,我需要过滤实际上包含一些 BZ2 文件的 HDFS 存储桶。这需要我在 MyMain.main 方法中保留的 Hadoop FileSystem 操作,以便将这些计算限制在驱动程序内存中而不是分发到工作节点,因为据我了解 FileSystem 不可序列化。

但是,即使在我创建了一个单独的可序列化 HadoopUtils 类并创建了一个单例伴生对象并在 MyMain.main 中调用了所有 FileSystem 操作之后,我仍然收到“Task not serializable”异常(如下)

/问题/

从可序列化对象(如 MyMain)调用不可序列化文件系统操作的方法是什么?此外,class HadoopUtils extends Serializable似乎不是可序列化的,尽管如此定义?

/我的代码/

val prependtoList = (x1: String, x2: List[String]) => x2.map(x1+_)

class HadoopUtils extends Serializable {

  def existsDir(fs: FileSystem, path: String) : Boolean = {
    val p = new Path(path)
    fs.exists(p) && fs.getFileStatus(p).isDirectory
  }
  def ifBZFileExists(fs: FileSystem, bucketBZDir: String) : Boolean = {
    val path = new Path(bucketBZDir)
    val fileStatus = fs.listStatus(path).filter(
      p => { p.isFile && p.getPath.getName.endsWith(".bz2")}
    )
    !fileStatus.isEmpty
  }

  def getBZ2Buckets(fs: FileSystem, lookupPath: String) : List[String] = {
    //Filter the list of buckets having at least one BZ2 file in it
    val range = (1 to 16).toList.map(x => x.toString)
    val buckets = prependtoList("Bucket",range)
    val allBuckets = prependtoList(lookupPath + "/", buckets)
    //From Bucket1 to Bucket16, filter the buckets that are existing e.g. Bucket5 may not exist
    val existingBuckets = allBuckets.filter(p => { existsDir(fs,p) })
    val BZ2BucketPaths = existingBuckets.filter(path => { ifBZFileExists(fs,path) }).map(
        path => { path + "/*.bz2" })
    BZ2BucketPaths
  }
}

object HadoopUtils {
  val getHadoopUtils = new HadoopUtils
}

object MyMain extends Serializable {
  val clusterNameNodeURL = "hdfs://mycluster.domain.com:8020"
  val basePath = "/path/to/buckets"
  def main(args: Array[String]): Unit = {
    //NOTE: spark, hadoopfs defined in main so as to be processed in Driver
    val spark = SparkSession
      .builder()
      .appName("My_App")
      .enableHiveSupport()
      .getOrCreate()

    val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

    val BZ2Buckets = 
      HadoopUtils.getHadoopUtils.getBZ2BucketPaths(hadoopfs,clusterNameNodeURL + basePath)

    BZ2Buckets.foreach(path => {
      //Doing Spark UDF transformations on each bucket, which needs to be serialized
    })


  }
}

/异常的堆栈跟踪/

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
  at MyMain$.main(<pastie>:197)
  ... 51 elided
Caused by: java.io.NotSerializableException: HadoopUtils$
Serialization stack:
    - object not serializable (class: HadoopUtils$, value: HadoopUtils$@7f5bab61)
    - field (class: $iw, name: HadoopUtils$module, type: class HadoopUtils$)
    - object (class $iw, $iw@3f4a0d43)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@74d06d1e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@f9764ea)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6821099e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@4f509444)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11462802)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@11d2d501)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@284fd700)
    - field (class: $line14.$read, name: $iw, type: class $iw)
    - object (class $line14.$read, $line14.$read@46b4206a)
    - field (class: $iw, name: $line14$read, type: class $line14.$read)
    - object (class $iw, $iw@33486894)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@25980fc9)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@1fb0d28d)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42ea11d5)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@42d28cc1)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@22131a73)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@631878e1)
    - field (class: $line18.$read, name: $iw, type: class $iw)
    - object (class $line18.$read, $line18.$read@561c52c0)
    - field (class: $iw, name: $line18$read, type: class $line18.$read)
    - object (class $iw, $iw@1d5b8be2)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@4de4c672)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function2>)
    - element of array (index: 9)
    - array (class [Ljava.lang.Object;, size 15)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 85 more

标签: scalaapache-sparkhadoopserialization

解决方案


似乎Task not serializable问题与HadoopUtils类或对象无关。鉴于,在驱动程序中,HadoopUtils类的实例是通过singleton HadoopUtils objectie访问HadoopUtils.getHadoopUtil的,HadoopUtils类需要与MyMain对象一起序列化。

该问题的解决方案可以参考here


推荐阅读