scala - Spark distinct 后跟 join 给出 IndexOutOfBoundsException
问题描述
尝试加入两个数据帧 A 和 B。B 在加入之前有一个不同的操作。B 中的一列也连接到 A 中的两列。这种特定情况给出了 IndexOutOfBoundsException。以前有人遇到过这种情况吗?
详情如下。提前致谢!
环境:
spark-shell standalone mode
Spark version 2.3.1
代码:
val df1 = Seq((1, "one", "one"), (2, "two", "two")).toDF("key1", "val11", "val12")
val df2 = Seq(("one", "first"), ("one", "first"), ("two", "second")).toDF("key2", "val2")
val df3 = df2.distinct
val df4 = df1.join(df3, col("val11") === col("key2") and col("val12") === col("key2"))
df4.show(false)
例外:
java.lang.IndexOutOfBoundsException: -1
at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:65)
at scala.collection.immutable.List.apply(List.scala:84)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$reorder$1.apply(EnsureRequirements.scala:233)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$reorder$1.apply(EnsureRequirements.scala:231)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.reorder(EnsureRequirements.scala:231)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinKeys(EnsureRequirements.scala:255)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates$1.applyOrElse(EnsureRequirements.scala:277)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates$1.applyOrElse(EnsureRequirements.scala:273)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.org$apache$spark$sql$execution$exchange$EnsureRequirements$$reorderJoinPredicates(EnsureRequirements.scala:273)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:302)
at org.apache.spark.sql.execution.exchange.EnsureRequirements$$anonfun$apply$1.applyOrElse(EnsureRequirements.scala:294)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:294)
at org.apache.spark.sql.execution.exchange.EnsureRequirements.apply(EnsureRequirements.scala:37)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3249)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
at org.apache.spark.sql.Dataset.show(Dataset.scala:725)
at org.apache.spark.sql.Dataset.show(Dataset.scala:702)
... 49 elided
更新:工作解决方案:感谢@1pluszara!
val df1 = Seq((1, "one", "one"), (2, "two", "two")).toDF("key1", "val11", "val12")
val df2 = Seq(("one", "first"), ("one", "first"), ("two", "second")).toDF("key2", "val2")
val df3 = spark.createDataFrame(df2.rdd.distinct, df2.schema)
val df4 = df1.join(df3, col("val11") === col("key2") and col("val12") === col("key2"))
df4.show(false)
解决方案
试过这个:
val df3 = df2.rdd.distinct().map({
case Row(key2: String, val2: String) => (key2,val2)
}).toDF("key2","val2")
val df4 = df1.join(df3, col("val11") === col("key2") and col("val12") === col("key2"))
df4.show(false)
输出:
+----+-----+-----+----+------+
|key1|val11|val12|key2|val2 |
+----+-----+-----+----+------+
|2 |two |two |two |second|
|1 |one |one |one |first |
+----+-----+-----+----+------+
但不确定数据框版本的内部执行是如何工作的。
推荐阅读
- django - 是否存在干扰“授权:承载 {token}”数据的 AWS 入站策略?
- python - 如何在 Streamlit 上更改面积图的标签?
- java - Logback Spring - 在运行时使用 jar 外部的外部 yaml 文件更改日志级别
- c# - 错误:输入字符串的格式不正确
- spring-boot - 关于 ResponseStatusException 返回错误对象中的空消息字段
- json - Azure 资源管理器:Web 应用槽配置:应用服务身份验证
- google-apps-script - 我需要将 Google Sheet 1 中的数据复制到 Google Sheet2 中的一行
- python - 使用贪心方法的活动选择问题的不同解决方案
- angular - 是否可以在 TypeScript 中设置 Angular SelectionModel 对象的 CompareWith 函数
- python - 从包含路径的数据框列中提取字典到数据框