amazon-web-services - EMR 生成文件的 Spark Kryo 反序列化在本地失败
问题描述
在将 EMR 版本升级到 6.2.0(我们之前使用 5.0 beta - ish)和 Spark 3.0.1 后,我们注意到我们无法在本地读取从 EMR 集群写入的 Kryo 文件(这在之前显然是可能的)。尝试读取此类文件时,抛出的异常如下:
com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: scala.Tuple3 cannot be cast to scala.Tuple2
我们使用 spark 3.0.1 和 Kryo 4.0.2(捆绑)并使用 Kryo::readClassAndObject 读取 Kryo 文件,使用 SparkContext::sequenceFile 对 RDD 重新读取进行操作。
解决方案
TL;DR:AWS EMR 6.2.0(可能更早)导致从 EMR 集群写入的 Kryo 文件的本地反序列化失败(由于集群运行 AWS Spark 分叉)。要修复的代码附在帖子末尾。
最近,Amazon EMR 集群运行自己的 Apache Spark 分支(即,对于 EMR 6.2.0 集群,Spark 版本是 3.0.1.amzn-0),其中包含了 Kryo 作为我们自己使用的默认序列化框架。自从升级到 6.2.0 后,我们注意到我们无法在本地读取从 EMR 6.2.0 集群写入的 Kryo 文件,它们会失败并显示如下消息:
com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: scala.Tuple3 cannot be cast to scala.Tuple2
我们试图读取的 RDD 确实是 Tuple2 类型的 RDD,但显然在反序列化时,Kryo 认为它出于某种原因被编码为 Tuple3 的 RDD。
现在,在内部,Kryo 拥有一个 ID <-> 类的映射,该映射是在运行时构建的,预计在读写 JVM 之间是一致的(用于确定要反序列化到哪个类)。该注册表建立在 Kryo 实例的实例化之上(我们使用 org.apache.spark.serializer.KryoSerializer::newKryo)。经过检查,我们注意到在执行序列化的 EMR 集群和我们的本地机器之间,Tuple2 的 ID 确实不同,这种差异归因于 EMR 设置中存在的单个类,而不是本地 - 这个类是 org.apache .spark.scheduler.HighlyCompressedMapStatus$CompressedSizes 在任何公开可用的 Spark 代码中都不存在,因此我们将其归因于 Amazon spark fork。
这里丑陋的解决方法是使用 Kryo 实例的 ClassResolver。如果注册表中不存在 CompressedSizes 类,我们将所有 id x >= 13 的类注册为 x + 1。这确实很难看,但作为本地修复,它可以工作。显然,它也可能会因 EMR/Kryo/Spark 的新版本而中断,所以要格外小心(我们只在本地使用它进行调试,这仍然很多)。
代码:以前,我们会像这样创建 Kryo 实例:
val kryoSerializer = new KryoSerializer(sc.getConf)
val kryo = kryoSerializer.newKryo()
现在,我们使用这个:
val kryo = adjustRegistrationsForEmrSpark(kryoSerializer.newKryo())
在哪里
private def adjustRegistrationsForEmrSpark(kryo: Kryo): Kryo = {
val existingRegistrations = getRegistrations(kryo)
val emrSpecificClassExists = existingRegistrations.exists(_.getType.getName.contains("CompressedSizes"))
if (emrSpecificClassExists) {
println(s"detected emr-specific class when creating kryo, not making any adjustments")
kryo
} else {
println(s"emr-specific class missing from registrations, adjusting existing classes by an offset of 1 to compensate")
val classResolver = kryo.getClassResolver
existingRegistrations.filter(_.getId >= 13).foreach { registration =>
val toRegister = new Registration(registration.getType, registration.getSerializer, registration.getId + 1)
classResolver.register(toRegister)
}
kryo
}
}
private def getRegistrations(kryo: Kryo): List[Registration] = {
var classIndex = 0
var reg: Registration = null
var result: List[Registration] = List()
do {
reg = kryo.getClassResolver.getRegistration(classIndex)
if (reg != null) result ++= List(reg)
classIndex = classIndex + 1
} while (reg != null)
result
}
推荐阅读
- c# - .net mvc 我可以在 razor 中使用 razor
- python - asyncio.sleep 阻塞了?
- reactjs - 基于按钮单击打开独特的模式或弹出器
- julia - 为什么 Julia 函数名(不带参数)会被静默忽略?
- android-studio - 我想知道如何在 android studio 中制作一个浮动按钮,即使在关闭活动后仍保留在屏幕上
- javascript - 返回具有特定顺序的特定索引处的数字的数组的函数
- php - Apache 服务器上的 websocket 配置文件
- java - 如何从蓝图(Camel-http)动态设置 HTTP 方法
- javascript - 如何根据那里的键在平面列表中选择多个值
- android - Java android通过蓝牙连接到设备