首页 > 解决方案 > Apache Spark:使用 JDBC 驱动程序写入 SQL Server/Azure DWH 的 BINARY 类型列的类型转换问题

问题描述

我最初的目标是将 UUId 值保存到 SQL Server/Azure DWH 到 BINARY(16) 类型的列。

例如,我有演示表:

CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL)

我想使用 Spark 向它写入数据,如下所示:

import java.util.UUID

val uuid = UUID.randomUUID()
val uuidBytes = Array.ofDim[Byte](16)
ByteBuffer.wrap(uuidBytes)
        .order(ByteOrder.BIG_ENDIAN)
        .putLong(uuid.getMostSignificantBits())
        .putLong(uuid.getLeastSignificantBits()

val schema = StructType(
  List(
    StructField("EventId", BinaryType, false)
  )
)
val data = Seq((uuidBytes)).toDF("EventId").rdd;
val df = spark.createDataFrame(data, schema);

df.write
  .format("jdbc")
  .option("url", "<DATABASE_CONNECTION_URL>")
  .option("dbTable", "Events")
  .mode(org.apache.spark.sql.SaveMode.Append)
  .save()

此代码返回错误: java.sql.BatchUpdateException: Conversion from variable or parameter type VARBINARY to target column type BINARY is not supported.

我的问题是如何应对这种情况并将 UUId 值插入 BINARY(16) 列?

我的调查:

Spark 使用 JdbcDialects 的概念,并为每个 Catalyst 类型映射到数据库类型,反之亦然。例如,这里是MsSqlServerDialect,它在我们处理 SQL Server 或 Azure DWH 时使用。在方法getJDBCType中可以看到映射:

case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY))

这是我认为的问题的根源。

所以,我决定实现我自己的 JdbcDialect 来覆盖这个行为:

class SqlServerDialect extends JdbcDialect {
  override def canHandle(url: String) : Boolean = url.startsWith("jdbc:sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case BinaryType => Option(JdbcType("BINARY(16)", java.sql.Types.BINARY))
    case _ => None
  }
}

val dialect = new SqlServerDialect
JdbcDialects.registerDialect(dialect)

通过此修改,我仍然会遇到完全相同的错误。看起来 Spark 不使用我的自定义方言的映射。但是我检查了方言是否已注册。所以这是奇怪的情况。

标签: sql-serverapache-sparkapache-spark-sql

解决方案


推荐阅读