首页 > 解决方案 > 如何在 spark-avro 2.4 模式中设置logicalType?

问题描述

我们从应用程序中的 avro 文件中读取时间戳信息。我正在测试从 Spark 2.3.1 到 Spark 2.4 的升级,其中包括新内置的 spark-avro 集成。但是,我无法弄清楚如何告诉 avro 模式我希望时间戳具有“timestamp-millis”的逻辑类型,而不是默认的“timestamp-micros”。

仅通过使用 Databricks spark-avro 4.0.0 包查看 Spark 2.3.1 下的测试 avro 文件,我们就有以下字段/模式:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

自 epoch 存储为 long 以来,其中的 searchTime 为毫秒。一切都很好。

当我升级到 Spark 2.4 和内置 spark-avro 2.4.0 包时,我有了这些更新的字段/模式:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

可以看到,底层类型仍然是 long,但现在增加了“timestamp-micros”的logicalType。这正是发行说明所说的那样,但是,我找不到指定架构以使用“timestamp-millis”选项的方法。

这成为一个问题,当我向 avro 文件写入一个 Timestamp 对象时,该对象被初始化为 10,000 秒后的 epoch,它将被读取为 10,000,000 秒。在 2.3.1/databricks-avro 下,它只是一个 long 没有与之相关的信息,所以它是刚进去就出来的。

我们目前通过反映感兴趣的对象来构建模式,如下所示:

val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]

我尝试通过创建一个修改后的模式来扩充这一点,该模式试图替换与 searchTime 条目对应的 StructField,如下所示:

    val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

但是,在 spark.sql.types 中定义的 StructField 对象没有可以扩充其中的 dataType 的logicalType 的概念。

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) 

我还尝试通过两种方式从 JSON 表示创建模式:

val schemaJSONrepr = """{
          |          "name" : "id",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchQuery",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchTime",
          |          "type" : "long",
          |          "logicalType" : "timestamp-millis",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "score",
          |          "type" : "double",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchType",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }""".stripMargin

第一次尝试只是从中创建一个 DataType

// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
     .schema(schema)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

失败是因为它无法为 searchTime 节点创建 StructType,因为其中包含“logicalType”。第二次尝试是通过传入原始 JSON 字符串来简单地创建模式。

spark.read
     .schema(schemaJSONrepr)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

这没有说:

mismatched input '{' expecting {'SELECT', 'FROM', ...

== SQL ==

{
^^^

我发现在spark-avro API中有一种方法可以从模式中获取逻辑类型,但无法弄清楚如何设置。

正如您在上面看到的失败尝试,我尝试使用 Schema.Parser 创建 avro 模式对象,但 spark.read.schema 中唯一接受的类型是 String 和 StructType。

如果有人可以提供有关如何更改/指定此逻辑类型的见解,我将非常感激。谢谢

标签: scalaapache-sparkavrospark-avro

解决方案


好吧,我想我回答了我自己的问题。当我修改以编程方式构建的架构以使用显式时间戳类型时

val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

当我们读取一个 Row 对象时,我没有改变逻辑。最初,我们会读取 Long 并将其转换为 Timestamp,这是出现问题的地方,因为它以微秒为单位读取 Long,这将使其比我们预期的大 1,000 倍。将我们的 read 更改为直接读取 Timestamp 对象让底层逻辑对此进行解释,从而将其从我们(我的)手中夺走。所以:

// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN

searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS

推荐阅读