首页 > 解决方案 > 带有 Play-json 验证的 Apache Spark Scala

问题描述

java.lang.UnsupportedOperationException:不支持类型 [trait object] 的架构

trait Container {
  def aa: String
  def bb: Int
}

case class First(aa: String, bb: Int) extends Container
case class Second(aa: String, bb: Int) extends Container

implicit val aaContainerFormat: Format[First] = Json.format[First]

implicit val bbContainerFormat: Format[Second] = Json.format[Second]

implicit def nodeContainerReads: Reads[Container] =
  try {
    Json.format[First].map(x => x: Container) or
    Json.format[Second].map(x => x: Container)
  } catch {
    case e: Exception => Reads {
      case _ => JsError(JsonValidationError("Cannot De-serialize value."))
    }
  }

implicit def nodeContainerWrites = new Writes[Container] {
  override def writes(node: Container): JsValue = node match {
    case a: First => Json.toJson(a)
    case b: Second => Json.toJson(b)
    case _ => Json.obj("error" -> "wrong Json")
  }
}

// Example Usage....
val spark: SparkSession = SparkSession.builder.appName("Unit Test").getOrCreate()
val js: Container = First("unit", "test")

spark.createDataFrame(Seq(js))

我期望 [Container Object] 的 Datasets 的输出,但实际输出是 java.lang.UnsupportedOperationException:不支持 Container 类型的模式。

标签: scalaapache-sparkplay-json

解决方案


Spark 不使用 Play JSON 中的类型类将 Scala 类型转换为 Spark SQL 类型。相反,您需要查看Spark 编码器,它构成了将 Scala 类型转换为 Spark 类型的基础。如果您有 Spark Session 在范围内,您可以使用import sparkSession.implicits._它,它会自动为您的案例类创建编码器。我相信 Spark 不支持开箱即用的 sum 类型,因此您需要实现自己的 Encoder 以某种方式在 Spark 中以特别的方式对其进行建模。如果您想在 Spark 中编码 sum 类型,请阅读此处了解更多信息


推荐阅读