首页 > 解决方案 > 在 Spark 中调用 Scala UDF 时,如何将 BinaryType 转换为 Array[Byte]?

问题描述

我在 Scala 中编写了以下 UDF:

import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import java.util.zip.{GZIPInputStream}

def Decompress(compressed: Array[Byte]): String = {
  val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
  val output = scala.io.Source.fromInputStream(inputStream).mkString
  
  return output
}

val decompressUdf = (compressed: Array[Byte]) => {
  Decompress(compressed)
}

spark.udf.register("Decompress", decompressUdf)

然后,我尝试使用以下内容调用 UDF:

val sessionsRawDF =
  sessionRawDF
    .withColumn("WebsiteSession", decompressUdf(sessionRawDF("body")))
    .select(
      current_timestamp().alias("ingesttime"),
      current_timestamp().cast("date").alias("p_ingestdate"),
      col("partition"),
      col("enqueuedTime"),
      col("WebsiteSession").alias("Json")
    )

当我运行它时,我收到以下错误:

command-130062350733681:9:错误:类型不匹配;
找到:org.apache.spark.sql.Column required:Array[Byte] decompressUdf(col("WebsiteSession")).alias("Json")

在这种情况下,我的印象是 Spark 会隐式获取值并从 spark 类型转到 Array[Byte] 。

请有人帮助我了解发生了什么,我已经为此奋斗了一段时间,不知道还有什么可以尝试的。

标签: scalaapache-sparkuser-defined-functions

解决方案


您需要先将 Scala 函数转换为 Spark UDF,然后才能将其注册为 UDF。例如,

val decompressUdf = udf(Decompress _)

spark.udf.register("Decompress", decompressUdf)

实际上,如果只是在 DataFrame API 中使用 UDF,则无需注册它。您可以简单地运行第一行并使用decompressUdf. 仅当您想在 SQL 中使用 UDF 时才需要注册。


推荐阅读