首页 > 解决方案 > Spark内置类型的Spark DataType Equality问题

问题描述

在运行 spark 应用程序时,我在催化剂内部遇到了错误。

例如:

java.lang.RuntimeException: scala.MatchError: LongType (of class org.apache.spark.sql.types.LongType$)
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$nullSafeCastFunction(Cast.scala:637)
org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:625)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)

我已将其范围缩小到火花计划中的以下内容:

Project [if (isnull(_rawTime#348L)) null else UDF(toTime(_rawTime#348L)) AS _time#438,

(请注意,我无法控制架构为空,因为我从 spark hbase 连接器获取此底层数据帧)

UDF 在哪里toTime花费很长时间并产生时间戳。LongType即使 match 语句具有以下内容,催化剂似乎也无法匹配:

 case LongType => castToLongCode(from, ctx)

有趣的是,当我第一次运行它时它运行良好。在第二次运行时,它有这个问题。

请注意,这是通过 apache Livy 运行的,因此执行之间的底层 spark 会话应该相同。

我在工作开始时放置了以下代码。

  logger.info("----------")
  logger.info(LongType + " " + System.identityHashCode(LongType))
  logger.info(DataTypes.LongType + " " + System.identityHashCode(DataTypes.LongType))
  logger.info("Equal " + (DataTypes.LongType == LongType))
  logger.info("----------")

然后运行它我看到:

first run:
----------
LongType 1044985410
LongType 1044985410
Equal true
----------
second run:
----------
LongType 355475697
LongType 1044985410
Equal false
----------

您可以在运行 2 中看到,对 LongType 的基于对象的调用与第一次运行时的标识不同。

Spark 的评论建议人们使用 DataTypes 中的单例。例如..DataTypes.LongType这是有道理的,因为它们看起来保持不变。但是,spark 自己的代码使用非单例。

LongType 定义为

/**
 * @since 1.3.0
 */
@InterfaceStability.Stable
case object LongType extends LongType

虽然DataTypes.LongType

public static final DataType LongType = LongType$.MODULE$;

其中指的是前者(案例对象)。单例将保持不变是有道理的。事实上,火花代码说Please use the singletonDataTypes.LongType ... 尽管内部火花代码的负载并没有这样做。对我来说,这感觉就像一个错误。

Spark 中的 Scala 代码编译得很好,然后由于类型上的这种突然的身份变化而失败,这似乎很奇怪。

所以我的问题是:

标签: scalaapache-sparkapache-spark-sql

解决方案


我已经解决了这个问题。

基本上所有的 DataType 实例都在 Scala 中定义为:

 * @since 1.3.0
 */
@InterfaceStability.Stable
case object LongType extends LongType

但是......在许多地方,Spark 使用 java 代码,这些代码使用单例获取 DataTypes:

 * Gets the LongType object.
 */
public static final DataType LongType = LongType$.MODULE$;

LongType$.MODULE$;是如何从 Java 领域调用案例对象。

但是我正在DataType使用 Kryo 将 a 序列化到 Livy,而 Kryo 正在内部重新初始化 LongType$.MODULE$;. 在 Scala 中,当您获得 case Object 时获得的引用不是与创建的第一个实例相关联,而是与创建的最后一个实例相关联。

所以时间线是:

  • 时间 0:DataTypes.LongType参考为 1, LongType参考也为 1。(其中ref仅表示参考)
  • 时间 1:Kryo 反序列化,因此重新实例化对象。但是,单例 DataTypes.LongType 指向第一个实例。即 DataTypes.LongType参考为 1, LongType参考为 2
  • time >=2:混乱随之而来——DataTypes 不会通过相等性检查。

解决方案是不要以这种方式将案例对象传递给 Kryo。可能由于某种原因我们没有正确使用 Kryo,或者我们需要使用 twitter/chill。


推荐阅读