scala - 运行 Spark 聚合器示例
问题描述
我正在尝试运行在Databricks 集群上找到的Spark 2.4.3 文档中的示例。
我添加了缺少的方法,现在代码如下所示:
case class Data(i: Int)
val customSummer = new Aggregator[Data, Int, Int] {
def zero: Int = 0
def reduce(b: Int, a: Data): Int = b + a.i
def merge(b1: Int, b2: Int): Int = b1 + b2
def finish(r: Int): Int = r
def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
}.toColumn
val ds = Seq(Data(1)).toDS
val aggregated = ds.select(customSummer).collect
我得到的错误是:org.apache.spark.SparkException: Task not serializable
我在堆栈跟踪中找到了这个:Caused by: java.io.NotSerializableException: org.apache.spark.sql.TypedColumn
问题是,有人能够运行类似的代码吗?如果是这样,您能否指出我可以了解我缺少什么的资源?
谢谢。
解决方案
将案例类放在测试类之外,并使外部类具有可Aggregator
序列化。
class Test extends Serializable {
@Test
def test62805430(): Unit = {
val customSummer = new Aggregator[Data, Int, Int] {
def zero: Int = 0
def reduce(b: Int, a: Data): Int = b + a.i
def merge(b1: Int, b2: Int): Int = b1 + b2
def finish(r: Int): Int = r
def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
}.toColumn
val ds = Seq(Data(1)).toDS
val aggregated = ds.select(customSummer).collect
println(aggregated.mkString(",")) // 1
}
}
case class Data(i: Int)
推荐阅读
- reactjs - AWS Amplify - 如何在登录后呈现组件
- typescript - 在 Typescript 文件中导入 Firebase Analytics
- html - CSS Grid carousel - 最后一项正在缩小
- javascript - 是否有在我的 Android 上运行 JavaScript 项目的命令?
- python - solve_ivp 缺少 1 个必需的位置参数错误
- r - 如何在R中仅对匹配的键值求和
- xamarin.forms - android上的xamarin.forms Collectionview中没有可见的选择
- c++ - 是什么导致“指定给 RtlValidateHeap(01480000, 014A2900) C-SDL-Blackjack.exe 的地址无效已触发断点。” 在这个项目中?
- c# - 当数据从 C# 到达时,Angular 中的 getDay() 未定义
- java - 如何在可见的两个视图之间切换?