首页 > 解决方案 > org.apache.spark.SparkException:任务不可序列化。斯卡拉火花

问题描述

将现有应用程序从 Spark 1.6 迁移到 Spark 2.2* (最终)会导致错误“org.apache.spark.SparkException: Task not serializable”。我过度简化了我的代码来演示同样的错误。代码查询 parquet 文件以返回以下数据类型:'org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]' 我应用一个函数来提取字符串和整数,返回一个字符串。一个固有的问题与 Spark 2.2 返回数据集而不是数据帧这一点有关。(有关初步错误,请参阅之前的帖子)如何编写数据集编码器以支持将函数映射到 Scala Spark 中的 org.apache.spark.sql.Dataset[String]

var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> def dd(s:String, start: Int) = { s + "some string" }
dd: (s: String, start: Int)String

scala> var d3 = d2.map{s=> dd(s,5) }
d3: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d3.take(1)
org.apache.spark.SparkException: Task not serializable

我目前对这个问题的解决方案是通过嵌入内联代码(见下文)但不实用,因为我的生产代码涉及大量参数和函数。我还尝试转换为数据框(就像在 spark 1.6 中一样)和函数定义的变体,这些定义并没有证明是一个可行的解决方案。

scala> var d1 = hive.executeQuery(st)
d1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cvdt35_message_id_d: string, cvdt35_input_timestamp_s: decimal(16,5) ... 2 more fields]

scala> val parseCVDP_parquet = (s:org.apache.spark.sql.Row) => s.getString(2).split("0x")(1)+","+s.getDecimal(1);
parseCVDP_parquet: org.apache.spark.sql.Row => String = <function1>

scala> var d2 =  d1.map(parseCVDP_parquet)
d2: org.apache.spark.sql.Dataset[String] = [value: string]

scala> var d3 = d2.map{s=> { s + "some string" } }
d3: org.apache.spark.sql.Dataset[String] = [value: string]

scala> d3.take(1)
20/04/30 15:16:17 WARN TaskSetManager: Stage 0 contains a task of very large size (132 KB). The maximum recommended task size is 100 KB.
res1: Array[String] = Array(761f006000705904,1521833533.96682some string)

标签: scalaapache-spark

解决方案


org.apache.spark.SparkException: Task not serialization

要解决此问题,请将所有函数和变量放入Object. 在需要的地方使用这些函数和变量。

通过这种方式,您可以解决大部分serialization问题

Example

package common
object AppFunctions {
  def append(s: String, start: Int) = s"${s}some thing"
}

object ExecuteQuery {
 import common.AppFunctions._

 [...]

 val d3 = d2.map(s => append(s,5)) // Pass required values to method.

 [...]


}

推荐阅读