首页 > 解决方案 > 运行 Spark 聚合器示例

问题描述

我正在尝试运行在Databricks 集群上找到的Spark 2.4.3 文档中的示例。

我添加了缺少的方法,现在代码如下所示:

case class Data(i: Int)

val customSummer =  new Aggregator[Data, Int, Int] {
 def zero: Int = 0
 def reduce(b: Int, a: Data): Int = b + a.i
 def merge(b1: Int, b2: Int): Int = b1 + b2
 def finish(r: Int): Int = r
 def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
 def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
}.toColumn

val ds = Seq(Data(1)).toDS
val aggregated = ds.select(customSummer).collect

我得到的错误是:org.apache.spark.SparkException: Task not serializable

我在堆栈跟踪中找到了这个:Caused by: java.io.NotSerializableException: org.apache.spark.sql.TypedColumn

这是完整的堆栈跟踪

问题是,有人能够运行类似的代码吗?如果是这样,您能否指出我可以了解我缺少什么的资源?

谢谢。

标签: scalaapache-spark

解决方案


将案例类放在测试类之外,并使外部类具有可Aggregator序列化。

class Test extends Serializable {
 @Test
  def test62805430(): Unit = {

    val customSummer =  new Aggregator[Data, Int, Int] {
      def zero: Int = 0
      def reduce(b: Int, a: Data): Int = b + a.i
      def merge(b1: Int, b2: Int): Int = b1 + b2
      def finish(r: Int): Int = r
      def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
      def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
    }.toColumn

    val ds = Seq(Data(1)).toDS
    val aggregated = ds.select(customSummer).collect
    println(aggregated.mkString(",")) // 1
  }

}

case class Data(i: Int)

推荐阅读