scala - 调用 udf 触发数据帧时任务不可序列化错误
问题描述
我有一个用于加密的 scala 函数,然后从中创建一个udf
并将其传递给我的als_embeddings
数据框中的一列,以将新列添加到我的数据框中。
import java.util.Base64
import javax.crypto.Cipher
import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
val Algorithm = "AES/CBC/PKCS5Padding"
val Key = new SecretKeySpec(Base64.getDecoder.decode("BiwHeIqzQa8X6MXtdg/hhQ=="), "AES")
val IvSpec = new IvParameterSpec(new Array[Byte](16))
def encrypt(text: String): String = {
val cipher = Cipher.getInstance(Algorithm)
cipher.init(Cipher.ENCRYPT_MODE, Key, IvSpec)
new String(Base64.getEncoder.encode(cipher.doFinal(text.getBytes("utf-8"))), "utf-8")
}
val encryptUDF = udf((uid : String) => encrypt(uid))
将上面传递encryptUDF
给我的 spark 数据框以创建一个加密的新列uid
val als_encrypt_embeddings = als_embeddings.withColumn("encrypt_uid",encryptUDF(col("uid")))
als_encrypt_embeddings.show()
但是当我这样做时,它给了我以下错误:
线程“主”org.apache.spark.SparkException 中的异常:任务不可序列化
我在这里想念什么。
解决方案
错误信息Task not serializable
是正确的,但不是很清楚。在堆栈跟踪的更下方,有更详细的解释出了什么问题:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
[...]
Caused by: java.io.NotSerializableException: javax.crypto.spec.IvParameterSpec
Serialization stack:
- object not serializable (class: javax.crypto.spec.IvParameterSpec, value: javax.crypto.spec.IvParameterSpec@7d4d65f5)
- field (class: Starter$$anonfun$1, name: IvSpec$1, type: class javax.crypto.spec.IvParameterSpec)
- object (class Starter$$anonfun$1, <function1>)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 48 more
在Caused by
stacktrace 的一部分中,Spark 报告说它无法序列化javax.crypto.spec.IvParameterSpec
.
ParameterSpec 已在驱动程序 JVM 中创建,同时 udf 在其中一个执行程序上执行。因此,必须对对象进行序列化才能将其移动到执行程序的 VM。由于对象不可序列化,因此移动它的尝试失败。
解决该问题的最简单方法是通过将代码块移动到 udf 的闭包中,直接在执行程序的 VM 中创建加密所需的对象:
val encryptUDF = udf((uid : String) => {
val Algorithm = "AES/CBC/PKCS5Padding"
val Key = new SecretKeySpec(Base64.getDecoder.decode("BiwHeIqzQa8X6MXtdg/hhQ=="), "AES")
val IvSpec = new IvParameterSpec(new Array[Byte](16))
def encrypt(text: String): String = {
val cipher = Cipher.getInstance(Algorithm)
cipher.init(Cipher.ENCRYPT_MODE, Key, IvSpec)
new String(Base64.getEncoder.encode(cipher.doFinal(text.getBytes("utf-8"))), "utf-8")
}
encrypt(uid)
})
这样,所有对象都将直接在 executors VM 中创建。
这种方法的缺点是每次调用 udf 都会创建一组加密对象。如果这些对象的实例化成本很高,这可能会导致性能问题。一种选择是使用mapPartitions而不是 udf。在这个答案中,mapPartitions 用于避免在迭代数据帧时创建太多昂贵的数据库连接。这种方法也可以在这里使用。
推荐阅读
- python-3.x - Discord.py 重写:函数突然停止工作
- python-3.x - 如何通过多处理划分解析函数的产量?
- haskell - 匿名函数中的参数
- r - 以正确的顺序插入行,如果它们不存在于当前位置
- c# - C# .NET 4.6.1 Entity Framework - DB.MyTable.Add(...) 很慢,尽管没有调用 DB.SaveChanges()
- android - android - 如何在 kotlin 的设置活动(首选项)中使用监听器
- javascript - 我将如何添加一个等级系统来增加分数?
- pandas - Pandas read_csv 在使用 skiprows 时返回太多行
- angular - 如何在不使用订阅的情况下从可观察对象中获取值?
- algorithm - 无向与有向图中的最长路径