首页 > 解决方案 > Spark 在 Dataset.map 期间错误地反序列化对象

问题描述

我通过 Clojure 中的 JVM 互操作调用 Spark 数据集 API。下面是 Clojure 代码和对应的 Scala 代码。

not_working.clj

(ns erp12.clark-example.repro2
  (:gen-class)
  (:import (org.apache.spark.sql SparkSession)
           (org.apache.spark.sql Encoders)
           (scala Function1)
           (java.io Serializable)))

(defn -main
  [& _]
  (-> (SparkSession/builder)
      (.master "local[*]")
      (.getOrCreate)
      (.range 1)
      (.map (reify
              Function1
              (apply [_ row]
                row)
              Serializable)
            (Encoders/LONG))
      (.show)))

working.scala

package com.nortia_solutions.ppi

import java.lang.Long
import org.apache.spark.sql.{Encoders, SparkSession}

case object Play extends App {
  SparkSession
    .builder
    .master("local[*]")
    .getOrCreate()
    .range(1)
    .map(new Function[Long, Long] with Serializable {
      def apply(v1: Long): Long = v1
    })(Encoders.LONG)
    .show()
}

Scala 代码按预期工作,但 Clojure 代码抛出以下异常:

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

完整的堆栈跟踪可以在这个 gist中找到。

在以下情况下不会引发异常(并且-main行为正确):

在以下情况下引发异常:

似乎,根据上下文,Spark 反序列化器不正确地将 a 反序列Function3化为SerializedLambda. 为什么会这样?

标签: apache-spark

解决方案


推荐阅读