scala - 在 Spark 中调用 Scala UDF 时,如何将 BinaryType 转换为 Array[Byte]?
问题描述
我在 Scala 中编写了以下 UDF:
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPInputStream}
def Decompress(compressed: Array[Byte]): String = {
val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
val output = scala.io.Source.fromInputStream(inputStream).mkString
return output
}
val decompressUdf = (compressed: Array[Byte]) => {
Decompress(compressed)
}
spark.udf.register("Decompress", decompressUdf)
然后,我尝试使用以下内容调用 UDF:
val sessionsRawDF =
sessionRawDF
.withColumn("WebsiteSession", decompressUdf(sessionRawDF("body")))
.select(
current_timestamp().alias("ingesttime"),
current_timestamp().cast("date").alias("p_ingestdate"),
col("partition"),
col("enqueuedTime"),
col("WebsiteSession").alias("Json")
)
当我运行它时,我收到以下错误:
command-130062350733681:9:错误:类型不匹配;
找到:org.apache.spark.sql.Column required:Array[Byte] decompressUdf(col("WebsiteSession")).alias("Json")
在这种情况下,我的印象是 Spark 会隐式获取值并从 spark 类型转到 Array[Byte] 。
请有人帮助我了解发生了什么,我已经为此奋斗了一段时间,不知道还有什么可以尝试的。
解决方案
您需要先将 Scala 函数转换为 Spark UDF,然后才能将其注册为 UDF。例如,
val decompressUdf = udf(Decompress _)
spark.udf.register("Decompress", decompressUdf)
实际上,如果只是在 DataFrame API 中使用 UDF,则无需注册它。您可以简单地运行第一行并使用decompressUdf
. 仅当您想在 SQL 中使用 UDF 时才需要注册。
推荐阅读
- multithreading - 互斥锁被 LOCKED 但预期的线程锁定
- docker - 如何在 Docker 容器上重新启动 Oracle WebLogic
- php - 如何在不使用外部 PHP 库的情况下在 MySQL 中进行弹性搜索?
- css - DIV 如何在 2 个不同的位置分别显示背景和文本?
- ionic-v1 - Ionic V1 Cordova 未定义
- python - python使用shell命令并发送输出
- jquery - 自定义 Bootstrap 4 scrollspy 未显示活动导航链接
- rest - 用于创建 TFVC 分支的 VSTS REST API
- php - 如何仅在特定页面中捆绑.min.css(在我的情况下为contact.php)
- reactjs - React-google-maps 缩放问题