apache-spark - Spark 在 Dataset.map 期间错误地反序列化对象
问题描述
我通过 Clojure 中的 JVM 互操作调用 Spark 数据集 API。下面是 Clojure 代码和对应的 Scala 代码。
not_working.clj
(ns erp12.clark-example.repro2
(:gen-class)
(:import (org.apache.spark.sql SparkSession)
(org.apache.spark.sql Encoders)
(scala Function1)
(java.io Serializable)))
(defn -main
[& _]
(-> (SparkSession/builder)
(.master "local[*]")
(.getOrCreate)
(.range 1)
(.map (reify
Function1
(apply [_ row]
row)
Serializable)
(Encoders/LONG))
(.show)))
working.scala
package com.nortia_solutions.ppi
import java.lang.Long
import org.apache.spark.sql.{Encoders, SparkSession}
case object Play extends App {
SparkSession
.builder
.master("local[*]")
.getOrCreate()
.range(1)
.map(new Function[Long, Long] with Serializable {
def apply(v1: Long): Long = v1
})(Encoders.LONG)
.show()
}
Scala 代码按预期工作,但 Clojure 代码抛出以下异常:
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
完整的堆栈跟踪可以在这个 gist中找到。
在以下情况下不会引发异常(并且-main
行为正确):
not_working.clj
被编译成一个uberjar。- 或
-main
在 REPL 中调用。
在以下情况下引发异常:
- 跑步
clj -X:spark erp12.clark-example.repro2/-main
-main
通过测试运行器运行
似乎,根据上下文,Spark 反序列化器不正确地将 a 反序列Function3
化为SerializedLambda
. 为什么会这样?