首页 > 解决方案 > Spark Scala DateType 模式执行错误

问题描述

当我尝试在 Spark Scala 中为数据框创建模式时出现执行错误,该模式显示:

Exception in thread "main" java.lang.IllegalArgumentException: No support for Spark SQL type DateType
    at org.apache.kudu.spark.kudu.SparkUtil$.sparkTypeToKuduType(SparkUtil.scala:81)
    at org.apache.kudu.spark.kudu.SparkUtil$.org$apache$kudu$spark$kudu$SparkUtil$$createColumnSchema(SparkUtil.scala:134)
    at org.apache.kudu.spark.kudu.SparkUtil$$anonfun$kuduSchema$3.apply(SparkUtil.scala:120)
    at org.apache.kudu.spark.kudu.SparkUtil$$anonfun$kuduSchema$3.apply(SparkUtil.scala:119)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.kudu.spark.kudu.SparkUtil$.kuduSchema(SparkUtil.scala:119)
    at org.apache.kudu.spark.kudu.KuduContext.createSchema(KuduContext.scala:234)
    at org.apache.kudu.spark.kudu.KuduContext.createTable(KuduContext.scala:210)

代码如下:

val invoicesSchema = StructType(
    List(
        StructField("id", StringType, false),
        StructField("invoicenumber", StringType, false),
        StructField("invoicedate", DateType, true)
    ))

kuduContext.createTable("invoices", invoicesSchema, Seq("id","invoicenumber"), new CreateTableOptions().setNumReplicas(3).addHashPartitions(List("id").asJava, 6))  

我该如何使用 DateType 来解决这个问题?StringType 和 FloatType 在同一代码中没有同样的问题

标签: scalaapache-sparkapache-kudu

解决方案


我称之为解决方法,有一个你需要定制的例子,但给你我认为你需要知道的要点:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DateType}
import org.apache.spark.sql.functions._

val df = Seq( ("2018-01-01", "2018-01-31", 80)
            , ("2018-01-07","2018-01-10", 10)
            , ("2018-01-07","2018-01-31", 10)
            , ("2018-01-11","2018-01-31", 5)
            , ("2018-01-25","2018-01-27", 5)
            , ("2018-02-02","2018-02-23", 100)
            ).toDF("sd","ed","coins")

val schema = List(("sd", "date"), ("ed", "date"), ("coins", "integer"))
val newColumns = schema.map(c => col(c._1).cast(c._2))
val newDF = df.select(newColumns:_*)
newDF.show(false)
...
...

推荐阅读