首页 > 解决方案 > Spark:foreachPartition 任务不可序列化

问题描述

根据我得到的Task not serializable瓷砖foreachPartition。代码片段下方:

documents.repartition(1).foreachPartition( allDocuments => {

  val luceneIndexWriter: IndexWriter = getIndexWriter(localLuceneIndexDirPath)
  val protosCache = Files.newOutputStream(Paths.get(s"${localLuceneIndexDirPath}/${PROTOS_CACHE_FILE}"))
  val protosMdFile = Files.newOutputStream(Paths.get(s"${localLuceneIndexDirPath}/${PROTOS_MD_FILE}"))
  val DOCID: AtomicInteger = new AtomicInteger(1)
  val umcIdsCache = new mutable.ListBuffer[String] 

  allDocuments.foreach ( row => {
  .....
  })
  luceneIndexWriter.commit()
  luceneIndexWriter.close()
})

我正在遍历每个文档并生成 lucene 索引。我的猜测是声明的不同作者是不可序列化的,因此是例外。但是作者是为每个分区定义的,因此不应该抱怨。有什么想法吗?

Edit2:已添加extends Serializable,现在面临以下错误:

21/11/12 18:35:23 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, cmdata0804.usmsc23.pie.com, executor 1): java.io.InvalidClassException: com.science.video.indexer.LuceneIndexerJob$; no valid constructor
at java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:169)
at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:874)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2043)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/11/12 18:35:23 INFO cluster.YarnClusterScheduler: Removed
TaskSet 2.0, whose tasks have all completed, from pool 
21/11/12 18:35:23 INFO cluster.YarnClusterScheduler: Cancelling
stage 2
21/11/12 18:35:23 INFO cluster.YarnClusterScheduler: Killing all running tasks in stage 2: Stage cancelled
21/11/12 18:35:23 INFO scheduler.DAGScheduler: ResultStage 2 (foreachPartition at LuceneIndexerHelper.scala:94) failed in 0.693 s due to Job aborted due to stage failure: 
Aborting TaskSet 2.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.












  

标签: scalaapache-sparkapache-spark-sqllucene

解决方案


推荐阅读