scala - Spark内置类型的Spark DataType Equality问题
问题描述
在运行 spark 应用程序时,我在催化剂内部遇到了错误。
例如:
java.lang.RuntimeException: scala.MatchError: LongType (of class org.apache.spark.sql.types.LongType$)
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$nullSafeCastFunction(Cast.scala:637)
org.apache.spark.sql.catalyst.expressions.Cast.doGenCode(Cast.scala:625)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
我已将其范围缩小到火花计划中的以下内容:
Project [if (isnull(_rawTime#348L)) null else UDF(toTime(_rawTime#348L)) AS _time#438,
(请注意,我无法控制架构为空,因为我从 spark hbase 连接器获取此底层数据帧)
UDF 在哪里toTime
花费很长时间并产生时间戳。LongType
即使 match 语句具有以下内容,催化剂似乎也无法匹配:
case LongType => castToLongCode(from, ctx)
有趣的是,当我第一次运行它时它运行良好。在第二次运行时,它有这个问题。
请注意,这是通过 apache Livy 运行的,因此执行之间的底层 spark 会话应该相同。
我在工作开始时放置了以下代码。
logger.info("----------")
logger.info(LongType + " " + System.identityHashCode(LongType))
logger.info(DataTypes.LongType + " " + System.identityHashCode(DataTypes.LongType))
logger.info("Equal " + (DataTypes.LongType == LongType))
logger.info("----------")
然后运行它我看到:
first run:
----------
LongType 1044985410
LongType 1044985410
Equal true
----------
second run:
----------
LongType 355475697
LongType 1044985410
Equal false
----------
您可以在运行 2 中看到,对 LongType 的基于对象的调用与第一次运行时的标识不同。
Spark 的评论建议人们使用 DataTypes 中的单例。例如..DataTypes.LongType
这是有道理的,因为它们看起来保持不变。但是,spark 自己的代码使用非单例。
LongType 定义为
/**
* @since 1.3.0
*/
@InterfaceStability.Stable
case object LongType extends LongType
虽然DataTypes.LongType
是
public static final DataType LongType = LongType$.MODULE$;
其中指的是前者(案例对象)。单例将保持不变是有道理的。事实上,火花代码说Please use the singleton
DataTypes.LongType .
.. 尽管内部火花代码的负载并没有这样做。对我来说,这感觉就像一个错误。
Spark 中的 Scala 代码编译得很好,然后由于类型上的这种突然的身份变化而失败,这似乎很奇怪。
所以我的问题是:
DataType
在 Spark中使用的建议是什么?我应该使用单例还是非单例?- 什么可能导致这个身份在我下面发生变化?
解决方案
我已经解决了这个问题。
基本上所有的 DataType 实例都在 Scala 中定义为:
* @since 1.3.0
*/
@InterfaceStability.Stable
case object LongType extends LongType
但是......在许多地方,Spark 使用 java 代码,这些代码使用单例获取 DataTypes:
* Gets the LongType object.
*/
public static final DataType LongType = LongType$.MODULE$;
这LongType$.MODULE$;
是如何从 Java 领域调用案例对象。
但是我正在DataType
使用 Kryo 将 a 序列化到 Livy,而 Kryo 正在内部重新初始化 LongType$.MODULE$;
. 在 Scala 中,当您获得 case Object 时获得的引用不是与创建的第一个实例相关联,而是与创建的最后一个实例相关联。
所以时间线是:
- 时间 0:
DataTypes.LongType
参考为 1,LongType
参考也为 1。(其中ref
仅表示参考) - 时间 1:Kryo 反序列化,因此重新实例化对象。但是,单例 DataTypes.LongType 指向第一个实例。即
DataTypes.LongType
参考为 1,LongType
参考为 2 - time >=2:混乱随之而来——DataTypes 不会通过相等性检查。
解决方案是不要以这种方式将案例对象传递给 Kryo。可能由于某种原因我们没有正确使用 Kryo,或者我们需要使用 twitter/chill。
推荐阅读
- python - 为什么我的查询变成布尔值,我该如何防止这种情况,所以我可以迭代它?
- c# - 如何使用插入语句从 foreach 获取值
- outlook-web-addins - 通过 Office 365 发送 Outlook
- numpy - 如何组合两列并应用二值化器?
- r - R 自定义最大功能
- react-native - 无法在本机反应中使组件与 flexDirection 水平对齐
- ajax - 收到错误“Symfony\Component\HttpKernel\Exception\HttpException”
- java - 如何将 PostgreSQL 与 Spring Mvc 连接起来
- c# - 如何测试单元测试中是否显示消息框?
- webassembly - Blazor 修改字符串 (toUpper) 失败