首页 > 解决方案 > 连接到 Spark 集群时的序列化问题

问题描述

我有一个用 Scala 编写的 Spark 应用程序,它可以从 Parquet 文件中写入和读取。该应用程序公开一个 HTTP API,当它接收到请求时,通过一个长期存在的上下文将工作发送到 Spark 集群,该上下文在应用程序的生命周期中持续存在。然后它将结果返回给 HTTP 客户端。

当我使用本地模式时,这一切都很好,local[*]作为主人。但是,一旦我尝试连接到 Spark 集群,就会遇到序列化问题。使用 Spark 的默认序列化程序,我得到以下信息:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.FilterExec.otherPreds of type scala.collection.Seq in instance of org.apache.spark.sql.execution.FilterExec.

如果我启用 Kryo 序列化程序,我会得到java.lang.IllegalStateException: unread block data.

当尝试从 Parquet 文件中读取数据时会发生这种情况,但是我认为这与 Parquet 文件本身没有任何关系,只是与发送到 Spark 集群的代码的序列化有关。

从大量互联网搜索中,我收集到这可能是由 Spark 版本甚至 Java 版本之间的不兼容引起的。但是使用的版本是相同的。

该应用程序是用 Scala 2.12.8 编写的,并附带 Spark 2.4.3。Spark 集群运行 Spark 2.4.3(使用 Scala 2.12 编译的版本)。运行 Spark 集群和应用程序的机器使用的是 openJDK 1.8.0_212。

根据另一个互联网搜索,问题可能是由于spark.masterURL 不匹配造成的。所以我设置spark.masterspark-defaults.conf我在应用程序中使用的相同值来连接它。

但是,这并没有解决问题,我现在没有想法了。

标签: scalaapache-sparkapache-spark-sqlcluster-computingparquet

解决方案


我不完全确定基本解释是什么,但我通过将应用程序的 jar 复制到 Spark 的jars目录来修复它。然后我仍然遇到一个错误,但是一个不同的错误:关于一个Cats/kernel/Eq类的缺失。所以我将cats-kerneljar 添加到 Spark 的jars目录中。

现在一切正常。我在另一个 Stack Overflow 线程中读到的内容可能会解释它:

我认为,每当您使用引用项目的方法/类的 lambda 进行任何类型的映射操作时,您都需要将它们作为附加 jar 提供。Spark 确实序列化了 lambda 本身,但没有将其依赖关系整合在一起。不知道为什么错误消息根本没有信息。


推荐阅读